We have a flink streaming job which read data from kinesis and sink it to S3 in parquet format.
Sink:
val parquetEventsWriterSink: StreamingFileSink[GenericRecord] = StreamingFileSink
.forBulkFormat(new Path(s"s3a://some-bucket-name/"), ParquetAvroWriters.forGenericRecord(AvroSchema[ParquetEvent]))
.withBucketCheckInterval(1000)
.withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"))
.build()
when i want do flink app update, i do following (stop flink app with savepoint and rerun from savepoint)
/usr/bin/flink stop ${FLINK_APP_ID} -p s3a://bucket-to-save/savepoint -d -yid ${YARN_APP_ID}
/usr/bin/flink run -m yarn-cluster -s s3a://bucket-to-save/savepoint/savepoint-588ff0-7a7febf4f80a --allowNonRestoredState /path/to/flink.jar
Shutdown flink app log:
...
2020-07-02 09:30:37,448 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32 @ 1593682237275 for job af23c08acc92229281cd28a12f8c42da.
2020-07-02 09:30:41,129 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 32 for job af23c08acc92229281cd28a12f8c42da (145557 bytes in 2408 ms).
2020-07-02 09:30:41,294 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Timestamps/Watermarks -> (....) (1/1) (67dc5d50d1713166b4e06d59c044806d) switched from RUNNING to FINISHED.
2020-07-02 09:30:41,303 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(TumblingEventTimeWindows(30000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) (1/1) (7820bd149b8a2bf2c18a7dbef24dea2a) switched from RUNNING to FINISHED.
2020-07-02 09:30:41,303 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (1/1) (7edae003a3d86bd0b0b6ccd6978d7225) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,331 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (e5ab39b6b0841aa5f90dfddf7035a014) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,331 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(TumblingEventTimeWindows(30000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (54c9e5818615472a94bd4164748af8ab) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,420 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(TumblingEventTimeWindows(120000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (45b06521f0dc628909f36e526281120e) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,430 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(TumblingEventTimeWindows(4000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (4290ac65e1cfa537c6fc2eda6f82030f) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,431 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(TumblingEventTimeWindows(30000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (2031b6da4375863db5b5496d359e4afe) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,466 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(SlidingEventTimeWindows(120000, 2000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (fb335e9cc26f3973e335fb5e52abc62c) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,468 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job JOB_NAME (af23c08acc92229281cd28a12f8c42da) switched from state RUNNING to FINISHED.
2020-07-02 09:30:42,468 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job af23c08acc92229281cd28a12f8c42da.
2020-07-02 09:30:42,468 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
2020-07-02 09:30:42,468 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint with ID 32 at 's3a://bucket-savepoint/savepoint-af23c0-91247f37c4e0' not discarded.
2020-07-02 09:30:42,479 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job af23c08acc92229281cd28a12f8c42da reached globally terminal state FINISHED.
2020-07-02 09:30:42,496 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job JOB_NAME(af23c08acc92229281cd28a12f8c42da).
2020-07-02 09:30:42,499 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool.
2020-07-02 09:30:42,500 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection b1b71407ad4d004faf36b9ca5ff59897: JobManager is shutting down..
2020-07-02 09:30:42,500 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool.
2020-07-02 09:30:42,500 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager [email protected]://[email protected]:39455/user/jobmanager_0 for job af23c08acc92229281cd28a12f8c42da from the resource manager.
2020-07-02 09:30:42,502 INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManagerRunner already shutdown.
Resume job from savepoint:
...
2020-07-02 09:32:59,878 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job c9b11c5904dd3182c68a967b1af8623c from savepoint s3a://bucket-savepoint/sp-s/savepoint-af23c0-91247f37c4e0 (allowing non restored state)
2020-07-02 09:33:00,225 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job c9b11c5904dd3182c68a967b1af8623c to 33.
2020-07-02 09:33:00,225 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job c9b11c5904dd3182c68a967b1af8623c from latest valid checkpoint: Checkpoint 32 @ 0 for c9b11c5904dd3182c68a967b1af8623c.
2020-07-02 09:33:00,232 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
2020-07-02 09:33:00,234 INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://[email protected]:45829/user/jobmanager_0.
2020-07-02 09:33:00,236 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) under job master id 00000000000000000000000000000000.
2020-07-02 09:33:00,237 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) switched from state CREATED to RUNNING.
2020-07-02 09:33:00,245 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Timestamps/Watermarks -> (Map -> files to s3 -> Sink: Unnamed, Filter, Filter, Filter, Filter, Filter) (1/1) (d9b4be9984c2ab3f468d7f286f8106cf) switched from CREATED to SCHEDULED.
...
2020-07-02 09:33:15,286 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Window(TumblingEventTimeWindows(4000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (79f5b69107d200d78bb8a32dfa32e126) switched from RUNNING to FAILED.
java.io.IOException: Inconsistent result for object 20200702/part-0-29: conflicting lengths. Recovered committer for upload 4jAOkQuDaQiMt53wlO3z8bzO6KtwkaDXXZIA6sTUE1f5pmvZs8EDPlRvu6VYI4y34sK5Zwr4p8fa8EglWvVvC_8z5sn_Dk6L6b5YJnWNIdThRHQ4qfMUK3dj1Eoi7cYweq0J42PcRrK7VOJLJmo8hg-- **indicates 12810 bytes, present object is 5825 bytes**
at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:98)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:156)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:399)
at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 20200702/part-0-29: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: _hidden_; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=:NoSuchUpload
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:222)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:267)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:84)
at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:85)
... 17 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID:; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+bEPoiI1idEj3UQQzSYOz3V6uesSrV3fXtcLkkYGuexCL/UJVY/1xODua8n8=
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3065)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:229)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 24 more
2020-07-02 09:33:15,287 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) switched from state RUNNING to FAILING.
java.io.IOException: Inconsistent result for object 20200702/part-0-29: conflicting lengths. Recovered committer for upload 4jAOkQuDaQiMt53wlO3z8bzO6KtwkaDXXZIA6sTUE1f5pmvZs8EDPlRvu6VYI4y34sK5Zwr4p8fa8EglWvVvC_8z5sn_Dk6L6b5YJnWNIdThRHQ4qfMUK3dj1Eoi7cYweq0J42PcRrK7VOJLJmo8hg-- indicates 12810 bytes, present object is 5825 bytes
at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:98)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:156)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:399)
at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 20200702/part-0-29: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 26483B8A3458BB00; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=:NoSuchUpload
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:222)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:267)
at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:84)
at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:85)
... 17 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 26483B8A3458BB00; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+bEPoiI1idEj3UQQzSYOz3V6uesSrV3fXtcLkkYGuexCL/UJVY/1xODua8n8=
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 24 more
2020-07-02 09:33:15,316 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Timestamps/Watermarks -> (Map -> packets to s3 -> Sink: Unnamed, Filter, Filter, Filter, Filter, Filter) (1/1) (d9b4be9984c2ab3f468d7f286f8106cf) switched from RUNNING to CANCELING.
Flink 1.9.1 (EMR emr-5.29.0)
Use "org.apache.flink" %% "flink-parquet" % "1.10.0", "org.apache.parquet" % "parquet-avro" % "1.10.0",
Consistency enable
. Im not entirely sure about if thats causing problem. But were some object inconsistency in the error log. – Snigdhajyotis3
instead ofs3a
. Its mentioned in AWS doc. For filesystem The s3a protocol is not supported. We suggest you use s3 in place of s3a – Snigdhajyoti