0
votes

I recently migrated our Flink from 1.9.0 to 1.11.1 without HA Job cluster. I am facing the following error which is causing JobManager to fail after every 5 minutes and Flink jobs stuck in these restart loop via AWS ECS.

It used to work in Flink 1.9.0 after upgrade 1.11.1 it does not. Since I don't have JM HA, I generate fixed --job-id for each flink job instead of default id 00000000000. I am new to Flink.

org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint
5.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
    at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 's3://data/flink/checkpoints/<unique_job_id>/chk-5/_metadata'
already exists
1
Could you clarify if <unique_job_id> is actually two folders? It's strange that there is no attempt id.Arvid Heise
@ArvidHeise No unique job-id is Flink supported UUID w/o dashunk1102
Note: there was way too much code formatting here. The names of pieces of software should be treated as proper nouns, and thus they are capitalised. That's it - no bolding, code formatting, etc.halfer

1 Answers

1
votes

The problem seems to be that you are re-using the same job id across multiple runs and this leads to collisions. If you are not using HA then you should always generate a unique job id for each job run/job submission. The easiest way to do it is to generate a random id. You only need to fix the job id if you want to recover a job run from the state stored in the HA stores.