2
votes

I have trouble to access S3 from a Flink job.

If I submit my assembled jar for my job, I get an access denied error:

Caused by: 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: ...; S3 Extended Request ID: ...), S3 Extended Request ID: ...

This is my setup: EMR cluster with is created with 'advanced configuration', Flink 1.4.0. and Hadoop 2.8.3. as applications. 1x master, 2x nodes

Instances have EMR_EC2_DefaultRole, which have policy AmazonElasticMapReduceforEC2Role, which has S3 full access. Indeed I can issue this commands successfully on either the master and slave nodes:

aws s3api list-buckets

hdfs dfs -ls s3://bucketA

I connect to the master and start a cluster: /usr/lib/flink/bin/yarn-session.sh -n 2 -d

The Flink job reads from a bucket as a source:

object TestS3 {
  def main(args: Array[String]): Unit = {

  val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

  val input: DataSet[String] = env.readTextFile("s3://bucketA/source/file")
  input.writeAsText("s3://bucketB/delete/me/later")

  env.execute()
  }
}

This is my simple build.sbt:

name := "TestS3"
scalaVersion := "2.11.11"
version := "0.1"
val flinkVersion = "1.4.0"
libraryDependencies ++= Seq(
    "org.apache.flink" % "flink-core" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
)

The bucket has no policy to deny read access. It has the policy to deny deletes, but this should not affect the Flink job. The EMR_EC2_Default_Role grants full access to S3.

As always, any hint of what I'm doing wrong is very much appreciated. Or maybe my expectation this should work is wrong?!

This is the full stacktrace:

java.io.IOException: Error opening the Input Split s3://bucketA/source/file [0,-1]: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: xyz_request_id; S3 Extended Request ID: xyz_ext_request_id), S3 Extended Request ID: xyz_ext_request_id
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: xyz_request_id; S3 Extended Request ID: xyz_ext_request_id), S3 Extended Request ID: xyz_ext_request_id
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:434)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:461)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1097)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
    at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: xyz_request_id; S3 Extended Request ID: xyz_ext_request_id), S3 Extended Request ID: xyz_ext_request_id
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:91)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
    ... 7 more
1

1 Answers

0
votes

We figured it out, the source bucket is encrypted with a KMS key. Therefore it was not sufficient for the EMR_EC2_DefaultRole to have full access to S3, but it needed access to the KMS key additionally. We extended the EMR_EC2_DefaultRole respectively and the Flink job can access the file now.

Maybe this post helps somebody to save some time (and not to forget KMS encryption).