0
votes

we have a flink streaming job which read data from kafka and sink it to S3. We were using flink's internal streaming file sink API to achieve this. However, after some days, the job failed and cannot recover from failure. The message says it cannot find tmp files from s3. We want to know what could be the possible root cause because we really do not want to lose any data.

Thanks.

The entire output looks like this

java.io.FileNotFoundException: No such file or directory: s3://bucket_name/_part-0-282_tmp_b9777494-d73b-4141-a4cf-b8912019160e
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:99)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverRecoverableUpload(S3RecoverableMultipartUploadFactory.java:75)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:95)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
    at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
1
Hello @Hu Guang, could you please paste the entire error message and the code snippet you use to define your sink?TobiSH
@TobiSH I have pasted the entire message above. ThanksHu Guang

1 Answers

1
votes

Thanks for reporting this!

Could you specify which Flink version are you using? The reason I am asking is because your problem may be related to this https://issues.apache.org/jira/browse/FLINK-13940 ticket.

In addition, the StreamingFileSink uses the Multi-Part Upload feature of S3. This means that files are uploaded gradually in small parts to S3 and when it is time to "commit" them, all the pieces are conceptually concatenated into a single object. S3 allows you to specify a timeout for pending (i.e. non-committed) Multi-Part Uploads (MPU) for your bucker, and when this expires, the pending MPU is aborted and the data is deleted. So if you have set this parameter aggressively, then you may bump into this issue.

Finally, from your previous post I guess you are trying to restart from a failure and not from a savepoint. Is this correct? If you are trying to restart from an old savepoint, then you may have the problem that the sink has already committed that MPU and now the sink cannot find it.

I hope this helps.