11
votes

Does someone succeed to use Apache Flink 0.9 to process data stored on AWS S3? I found they are using own S3FileSystem instead of one from Hadoop... and it looks like it doesn't work. I put the following path s3://bucket.s3.amazonaws.com/folder it's failed with the following exception:

java.io.IOException: Cannot establish connection to Amazon S3: com.amazonaws.services.s3.model.AmazonS3Exception: The request signature we calculated does not match the signature you provided. Check your key and signing method. (Service: Amazon S3; Status Code: 403;

2
It looks like Flink's own S3 File System support is broken. It seems that another committer of the Apache Flink also saw and confirmed the issue: apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/…Robert Metzger
yes, it's. Unfortunatelly, the end of the story is not clear from the thread... so I hope it was fixed...Konstantin Kudryavtsev
I will continue the discussion on the Flink mailing list.Robert Metzger

2 Answers

9
votes

Update May 2016: The Flink documentation now has a page on how to use Flink with AWS


The question has been asked on the Flink user mailing list as well and I've answered it over there: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3-data-with-Apache-Flink-td3046.html

tl;dr:

Flink program

public class S3FileSystem {
   public static void main(String[] args) throws Exception {
      ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
      DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
      myLines.print();
   }
}

Add the following to core-site.xml and make it available to Flink:

<property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>putKeyHere</value>
</property>

<property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>putSecretHere</value>
</property>
<property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
-1
votes

you can retrieve the artifacts from the S3 bucket that is specified in the output section of the CloudFormation template. i.e. After the Flink runtime is up and running, the taxi stream processor program can be submitted to the Flink runtime to start the real-time analysis of the trip events in the Amazon Kinesis stream.

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

Both of the above commands use Amazon's S3 as source, you have to specify the artifact name accordingly.

Note: you can follow the link below and make a pipeline using EMR and S3 buckets.

https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/