0
votes

I have a Flink app with high parallelism (400) running in AWS EMR. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At any given time, it's possible that all workers combined will generate/write to 400 files (due to 400 parallelism). After a few days, one of the workers will fail with the exception:

org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)

This seems to randomly occur when a new part file is created by the BucketingSink. The odd thing is that this happens randomly and when it occurs, it happens to 1 of the parallel flink workers (not all). Also, when this occurs, the Flink job transitions into a FAILING state, but the Flink job does not restart and resume/recover from the last successful checkpoint. What is the cause for this and how should it be resolved? Additionally, how can the job be configured to restart/recover from the last successful checkpoint instead of remaining in the FAILING state?

1

1 Answers

1
votes

I think this is known behavior with the bucketing sink and S3, and the suggested solution is to use the shiny new StreamingFileSink in Flink 1.7.0.

Basically, the bucketing sink expects writes and renames to happen immediately like they would in a real file system, but that isn't a good assumption for object stores like S3, so the bucketing sink ends up with race conditions that cause intermittent problems. Here's a JIRA ticket that sort of describes the problem, and the related tickets flesh it out a bit more. JIRA FLINK-9752