1
votes

I'm testing the elasticity features in Flink 1.3.0. I have a job with checkpointing enabled and fixed-delay restart policy. When I kill one of the TaskManager JVM, after a while the job correctly restarts on the remaining node. However, when I add a new node, the job is not restarted automatically to make use of it.

I tried to use bin/flink stop <jobId> but it always gives me java.lang.IllegalStateException: Job with ID <jobId> is not stoppable.

How can I restart the job to make use of the additional node?

1

1 Answers

5
votes

Flink 1.3 does not provide dynamic rescaling, and won't automatically restart a job to take advantage of newly available resources. To restart a job in such a scenario, you should take a savepoint, increase the parallelism, and restart the job from the savepoint. You can cancel a job with a savepoint like this:

flink cancel -s [targetDirectory] <jobID>

and then restart it via

flink run -s <savepointPath> ...

See the CLI docs and savepoint docs for more details on savepoints, but you can think of a savepoint as a user-triggered checkpoint.

Apache FlinkĀ® at MediaMath: Rescaling Stateful Applications in Production is a recent blog post from data Artians with a lot of detail about how rescaling works internally.