2
votes

We have deployed a new instance of Flink with version 1.4. While trying to restore savepoints from our old 1.2.1 deployment, we get the same error with all the jobs we tried to restore:

org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1360)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1350)
    ... 10 more

The error message:

Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!

Seems to be wrong, though, as our other deployment is running 1.2.1.

The documentation page is still not updated for 1.4: https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html but seems like the parallelism has been an issue in the past. I have tried using the same as the job where the savepoint is coming, and still the same issue.

Any tip about what could be causing this and how to fix it?

Thank you!

2

2 Answers

2
votes

With version 1.4.0, Flink no longer supports restoring from state taken with the Checkpointed interface. In order to do a stateful upgrade, you have to do the following:

  1. Take a savepoint of your job running on Flink 1.2.1
  2. Replace Checkpointed with CheckpointedFunction at all stateful functions
  3. Implement CheckpointedRestoring interface to restore from the Checkpointed savepoint
  4. Execute the modified job on Flink 1.2.1 and take a second savepoint
  5. Remove CheckpointedRestoring interface from all stateful functions
  6. Run modified job with the second savepoint on Flink 1.4.0

Let me know if there are still other problems while migrating your job.

0
votes

So, finally figured out the issue.

We started running our tasks back in Flink 1.1, which then migrated with their savepoints to 1.2.1.

Seems like Flink 1.2.1 doesn't do any upgrade to the savepoint, so they will still have the old format, the one not supported by Flink 1.4.

The solution was to run our task + the savepoint in Flink 1.3, and create a new savepoint there, which will be saved in the new format. This one is finally compatible with Flink 1.4 :)