I have a Flink app with high parallelism (400) running in AWS EMR. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At any given time, it's possible that all workers combined will generate/write to 400 files (due to 400 parallelism). After a few days, one of the workers will fail with the exception:
org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
This seems to randomly occur when a new part file is created by the BucketingSink. The odd thing is that this happens randomly and when it occurs, it happens to 1 of the parallel flink workers (not all). Also, when this occurs, the Flink job transitions into a FAILING state, but the Flink job does not restart and resume/recover from the last successful checkpoint. What is the cause for this and how should it be resolved? Additionally, how can the job be configured to restart/recover from the last successful checkpoint instead of remaining in the FAILING state?