2
votes

I am using Apache Flink 1.4 on a cluster of 3 machines, out of which one is the JobManager and the other 2 host TaskManagers.

I start flink in cluster mode and submit a flink job. I have configured 24 task slots in the flink config, and for the job I use 6 task slots. In the code, I have enabled checkpointing and also set the restart strategy to fixedDelayRestart.

When I submit the job, I see 3 tasks are assigned to Worker machine 1, and 3 are assigned to Worker machine 2. Now, when I kill the TaskManager on WorkerMachine 2, I see that the entire job fails.

  1. Is this the expected behaviour, or does it have automatic failover as in Spark.

  2. Do we need to use YARN/Mesos or Zookeeper to achieve automatic failover?

  3. We tried the Restart Strategy, but when it restarts we get an exception saying that no task slots are available and then the job fails. We think that 24 slots is enough to take over. What could we be doing wrong here?

NOTE -

This is a Flink Streaming job. I get a java.net.ConnectException each time the JobManager tries to connect to the TaskManager that I have killed. It retries 3 times(the number I have set) and then the job fails.

I expect the JobManager to move the workload to the remaining machine on which TaskManager is running. Or does it expect both TaskManagers to be up by the time it restarts?

1
Are you running Flink Streaming job? If it is and being set the restart strategy through env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0)), the streaming job will be restarted within attempt number as long as the cluster has enough slots.BrightFlow
Updated the questionJames Isaac

1 Answers

0
votes

In case you lose one or more task managers on which your job was running, the following actions will be taken, depending on the configured restart strategy:

  1. Flink stops the job on all other task managers running it.
  2. Flink will try to acquire the number of missing slots for the desired parallelism from the remaining task managers in the cluster, if available. If there are not enough task slots available, Flink will ask the cluster manager (YARN, Mesos, Native Kubernetes) to start new task manager(s) - not available for standalone clusters.
  3. Flink restarts the whole job based on its latest checkpoint or savepoint (whichever is newer). Please note that Flink's fault-tolerance model is based on checkpoints: you configure how often checkpoints should be started, and each operator then writes its Flink-managed state into a checkpoint file onto some distributed storage - basically any distributed file system of your choice. Please refer to Flink's checkpointing documentation for configuration details.

from https://ververica.zendesk.com/hc/en-us/articles/360002262919-What-happens-if-a-task-manager-is-lost-