1
votes

I am encountering an error when I try to connect to a s3 bucket from R studio using sparklyr. The s3 bucket is in the eu-central-1(Frankfurt) region. Spark version - 2.1.0 , Hadoop 2.7. I am getting a 403 response code with signature mismatch error. However, when I try to get an s3a bucket instead however I get a 400 response code. Any word on alternative methods to connect to s3 buckets via spark in R studio, is also appreciated. The connection to s3 works fine without Spark.

Here is the code,

   # install.packages("devtools")
# devtools::install_github("rstudio/sparklyr") 
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")
spark_disconnect(sc)
config <- spark_config()
library(sparklyr)
library(dplyr)
# config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
# config$spark.executor.memory <- "4g"
sc <- spark_connect(master = "local",config = config)


ctx <- sparklyr::spark_context(sc)
jsc <- invoke_static(
sc,
"org.apache.spark.api.java.JavaSparkContext",
"fromSparkContext",
ctx
)


hconf <- jsc %>% invoke("hadoopConfiguration")
hconf %>% invoke("set","fs.s3.access.key", "xx")
hconf %>% invoke("set","fs.s3.secret.key", "xx")
#hconf %>% invoke("set","com.amazonaws.services.s3.enableV4", "true")
test <- spark_read_csv(sc, "test", "s3://********.csv")
Error: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Service Error Message. -- ResponseCode: 403, ResponseStatus: Forbidden, XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>AKIAJ3ZD2ZEISKNQMSGQ</AWSAccessKeyId><StringToSign>AWS4-HMAC-SHA25620171129T123633Z20171129/eu-central-1/s3/aws4_request555016eca303c98732f51adcaaa83eac7368fb75f59eaa9f59116684b9030ee0</StringToSign><SignatureProvided>bf703f56827aa0f04aab3fa6a1e2aa277117344cdbfc3f4f3e51895ce62af826</SignatureProvided><StringToSignBytes>41 57 53 34 2d 48 4d 41 43 2d 53 48 41 32 35 36 0a 32 30 31 37 31 31 32 39 54 31 32 33 36 33 33 5a 0a 32 30 31 37 31 31 32 39 2f 65 75 2d 63 65 6e 74 72 61 6c 2d 31 2f 73 33 2f 61 77 73 34 5f 72 65 71 75 65 73 74 0a 35 35 35 30 31 36 65 63 61 33 30 33 63 39 38 37 33 32 66 35 31... <truncated>
    at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:175)
    at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy23.retrieveINode(Unknown Source)
    at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:381)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:352)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke$.invoke(invoke.scala:102)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
    at sparklyr.StreamHandler$.read(stream.scala:62)
    at sparklyr.BackendHandler.channelRead0(handler.scala:52)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.jets3t.service.S3ServiceException: Service Error Message. -- ResponseCode: 403, ResponseStatus: Forbidden, XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided. Check your key and signing method.</Message><AWSAccessKeyId>AKIAJ3ZD2ZEISKNQMSGQ</AWSAccessKeyId><StringToSign>AWS4-HMAC-SHA25620171129T123633Z20171129/eu-central-1/s3/aws4_request555016eca303
3

3 Answers

1
votes

It could be due to Signature Version incompatibility between regions. There are some regions which do not support Signature Version 2 and you must use Version 4 as mentioned here - http://docs.aws.amazon.com/general/latest/gr/signature-version-2.html#signature-2-regions-services

When comes to S3 bucket/object access, you must specify the region name (eu-central-1) in your request.

0
votes

You need to set your AWS credentials via

Sys.setenv(AWS_ACCESS_KEY_ID="[Your access key]")
Sys.setenv(AWS_SECRET_ACCESS_KEY="[Your secret access key]")

see ?spark_read_csv

Also config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" is essential.

0
votes

If you are using Amazon's own EMR then you need to look at their docs.

If you are using Apache's own artifacts, then you need to switch off s3 to the s3a filesystem connector, then enable

  1. Set the JVM system property com.amazonaws.services.s3.enableV4 to true
  2. Set the endpoint fs.s3a.endpoint to the endpoint of the specific store you want to talk to see the list.

If things don't work you get a fairly generic "400 bad request" message, which isn't that useful in working out why things are wrong. Start with getting things to work with a bucket in s3 US east (s3a://landsat-pds) is one to try listing, then move to the bucket in the v4 region.