0
votes

We have a flink streaming job which read data from kinesis and sink it to S3 in parquet format.

Sink:

    val parquetEventsWriterSink: StreamingFileSink[GenericRecord] = StreamingFileSink
      .forBulkFormat(new Path(s"s3a://some-bucket-name/"), ParquetAvroWriters.forGenericRecord(AvroSchema[ParquetEvent]))
      .withBucketCheckInterval(1000)
      .withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"))
      .build()

when i want do flink app update, i do following (stop flink app with savepoint and rerun from savepoint)

/usr/bin/flink stop ${FLINK_APP_ID} -p s3a://bucket-to-save/savepoint -d -yid ${YARN_APP_ID}
/usr/bin/flink run -m yarn-cluster -s s3a://bucket-to-save/savepoint/savepoint-588ff0-7a7febf4f80a --allowNonRestoredState /path/to/flink.jar

Shutdown flink app log:

...
2020-07-02 09:30:37,448 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32 @ 1593682237275 for job af23c08acc92229281cd28a12f8c42da.
2020-07-02 09:30:41,129 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32 for job af23c08acc92229281cd28a12f8c42da (145557 bytes in 2408 ms).
2020-07-02 09:30:41,294 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Timestamps/Watermarks -> (....) (1/1) (67dc5d50d1713166b4e06d59c044806d) switched from RUNNING to FINISHED.
2020-07-02 09:30:41,303 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(TumblingEventTimeWindows(30000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) (1/1) (7820bd149b8a2bf2c18a7dbef24dea2a) switched from RUNNING to FINISHED.
2020-07-02 09:30:41,303 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedProcess (1/1) (7edae003a3d86bd0b0b6ccd6978d7225) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,331 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedProcess -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (e5ab39b6b0841aa5f90dfddf7035a014) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,331 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(TumblingEventTimeWindows(30000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (54c9e5818615472a94bd4164748af8ab) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,420 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(TumblingEventTimeWindows(120000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (45b06521f0dc628909f36e526281120e) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,430 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(TumblingEventTimeWindows(4000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (4290ac65e1cfa537c6fc2eda6f82030f) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,431 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(TumblingEventTimeWindows(30000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (2031b6da4375863db5b5496d359e4afe) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,466 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(SlidingEventTimeWindows(120000, 2000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (fb335e9cc26f3973e335fb5e52abc62c) switched from RUNNING to FINISHED.
2020-07-02 09:30:42,468 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job JOB_NAME (af23c08acc92229281cd28a12f8c42da) switched from state RUNNING to FINISHED.
2020-07-02 09:30:42,468 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job af23c08acc92229281cd28a12f8c42da.
2020-07-02 09:30:42,468 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - Shutting down
2020-07-02 09:30:42,468 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint       - Checkpoint with ID 32 at 's3a://bucket-savepoint/savepoint-af23c0-91247f37c4e0' not discarded.
2020-07-02 09:30:42,479 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job af23c08acc92229281cd28a12f8c42da reached globally terminal state FINISHED.
2020-07-02 09:30:42,496 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the JobMaster for job JOB_NAME(af23c08acc92229281cd28a12f8c42da).
2020-07-02 09:30:42,499 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Suspending SlotPool.
2020-07-02 09:30:42,500 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Close ResourceManager connection b1b71407ad4d004faf36b9ca5ff59897: JobManager is shutting down..
2020-07-02 09:30:42,500 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Stopping SlotPool.
2020-07-02 09:30:42,500 INFO  org.apache.flink.yarn.YarnResourceManager                     - Disconnect job manager [email protected]://[email protected]:39455/user/jobmanager_0 for job af23c08acc92229281cd28a12f8c42da from the resource manager.
2020-07-02 09:30:42,502 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManagerRunner already shutdown.

Resume job from savepoint:

...
2020-07-02 09:32:59,878 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job c9b11c5904dd3182c68a967b1af8623c from savepoint s3a://bucket-savepoint/sp-s/savepoint-af23c0-91247f37c4e0 (allowing non restored state)
2020-07-02 09:33:00,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job c9b11c5904dd3182c68a967b1af8623c to 33.
2020-07-02 09:33:00,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job c9b11c5904dd3182c68a967b1af8623c from latest valid checkpoint: Checkpoint 32 @ 0 for c9b11c5904dd3182c68a967b1af8623c.
2020-07-02 09:33:00,232 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore
2020-07-02 09:33:00,234 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager runner for job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://[email protected]:45829/user/jobmanager_0.
2020-07-02 09:33:00,236 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Starting execution of job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) under job master id 00000000000000000000000000000000.
2020-07-02 09:33:00,237 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) switched from state CREATED to RUNNING.
2020-07-02 09:33:00,245 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Timestamps/Watermarks -> (Map -> files to s3 -> Sink: Unnamed, Filter, Filter, Filter, Filter, Filter) (1/1) (d9b4be9984c2ab3f468d7f286f8106cf) switched from CREATED to SCHEDULED.
...
2020-07-02 09:33:15,286 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(TumblingEventTimeWindows(4000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> (Map -> Sink: Unnamed, Map -> Map -> Sink: Unnamed) (1/1) (79f5b69107d200d78bb8a32dfa32e126) switched from RUNNING to FAILED.
java.io.IOException: Inconsistent result for object 20200702/part-0-29: conflicting lengths. Recovered committer for upload 4jAOkQuDaQiMt53wlO3z8bzO6KtwkaDXXZIA6sTUE1f5pmvZs8EDPlRvu6VYI4y34sK5Zwr4p8fa8EglWvVvC_8z5sn_Dk6L6b5YJnWNIdThRHQ4qfMUK3dj1Eoi7cYweq0J42PcRrK7VOJLJmo8hg-- **indicates 12810 bytes, present object is 5825 bytes**
    at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:98)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:156)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:399)
    at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 20200702/part-0-29: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: _hidden_; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=:NoSuchUpload
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:222)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:267)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:84)
    at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:85)
    ... 17 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID:; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+bEPoiI1idEj3UQQzSYOz3V6uesSrV3fXtcLkkYGuexCL/UJVY/1xODua8n8=
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3065)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:229)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 24 more
2020-07-02 09:33:15,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job JOB_NAME (c9b11c5904dd3182c68a967b1af8623c) switched from state RUNNING to FAILING.
java.io.IOException: Inconsistent result for object 20200702/part-0-29: conflicting lengths. Recovered committer for upload 4jAOkQuDaQiMt53wlO3z8bzO6KtwkaDXXZIA6sTUE1f5pmvZs8EDPlRvu6VYI4y34sK5Zwr4p8fa8EglWvVvC_8z5sn_Dk6L6b5YJnWNIdThRHQ4qfMUK3dj1Eoi7cYweq0J42PcRrK7VOJLJmo8hg-- indicates 12810 bytes, present object is 5825 bytes
    at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:98)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:156)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:399)
    at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 20200702/part-0-29: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 26483B8A3458BB00; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=:NoSuchUpload
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:222)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:267)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:84)
    at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:85)
    ... 17 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 26483B8A3458BB00; S3 Extended Request ID: nBiv1K/zDVx/FuA+/UJVY/1xODua8n8=), S3 Extended Request ID: nBiv1K/zDVx/FuA+bEPoiI1idEj3UQQzSYOz3V6uesSrV3fXtcLkkYGuexCL/UJVY/1xODua8n8=
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
    at 
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 24 more
2020-07-02 09:33:15,316 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Timestamps/Watermarks -> (Map -> packets to s3 -> Sink: Unnamed, Filter, Filter, Filter, Filter, Filter) (1/1) (d9b4be9984c2ab3f468d7f286f8106cf) switched from RUNNING to CANCELING.

Flink 1.9.1 (EMR emr-5.29.0)

Use "org.apache.flink" %% "flink-parquet" % "1.10.0", "org.apache.parquet" % "parquet-avro" % "1.10.0",

1
Is consistency view enabled on your cluster?Snigdhajyoti
I would assign operator IDs to the UDFs to make sure they are resuming form the correct state. ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/…Felipe
@Snigdhajyoti how i can check it ?Sergey Postument
Consistency view uses one dynamo db table. You can check that in EMR console, there will be a flag named Consistency enable. Im not entirely sure about if thats causing problem. But were some object inconsistency in the error log.Snigdhajyoti
And could you use s3 instead of s3a. Its mentioned in AWS doc. For filesystem The s3a protocol is not supported. We suggest you use s3 in place of s3aSnigdhajyoti

1 Answers

0
votes

If you restart with the same Flink version and the same job (unmodified) it works or it throws the same exception? Also, can it be that you have configured any pruning policy for your pending Multi-part Uploads that cancels them before they are committed?