1
votes

I can deploy flow to standalone installation of Apache Flink (with one JobManager and several TaskManagers) without problem:

bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters>

but when I run the same command and deploy to Standalone HA cluster this command raise error:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
    at org.apache.flink.client.program.Client.runDetached(Client.java:406)
    at org.apache.flink.client.program.Client.runDetached(Client.java:366)
    at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
    at org.apache.flink.client.program.Client.runDetached(Client.java:278)
    at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221)
    at org.apache.flink.client.program.Client.runDetached(Client.java:403)
    ... 7 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at scala.concurrent.Await.result(package.scala)
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218)
    ... 8 more

Active Job Manager write the following errors to log:

2016-04-14 13:54:44,160 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-04-14 13:54:46,299 WARN  org.apache.flink.runtime.jobmanager.JobManager                - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None.

So, I don't understand what can cause such error?

Let me known if required additional information.

P.S.

Deploy from Flink Dashboard works fine for Standalone HA cluster. Such problem appear when I deploy through Flink CLI only.

Update

I clear Zookeeper, clear directories used by Flink on disk and re-deploy Flink Standalone HA cluster. Then I try to run flow use bin/flink run command. As you can see JobManager write only one line about problem (see flink--jobmanager-0-example-app-1.stag.local.log).

All JobManagers and TaskManagers use the same flink-conf.yaml:

jobmanager.heap.mb: 1024
jobmanager.web.port: 8081

taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager

blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage

parallelism.default: 4

state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints

restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s

recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example/flink
recovery.zookeeper.storageDir: s3a://example-flink/recovery
recovery.jobmanager.port: 6123

fs.hdfs.hadoopconf: /flink/conf

So, seems like Standalone HA cluster configured correctly.

Update 2

FYI: I want to install Standalone HA cluster as described here. Not YARN HA cluster.

Update 3

Here is log created by bin/flink CLI: flink-username-client-hostname.local.log.

1
Could you paste the complete jobmanager log?Till Rohrmann
Have you set the proper HA settings in the flink-conf.yaml which is used by the cli (thus the one in FLINK_HOME/conf/flink-conf.yaml)?Till Rohrmann
@TillRohrmann I have added logs and flink-conf.yaml. Do you see any problems?Maxim
You're right, this not only applies to the yarn cluster but also to the standalone cluster. So whenever you use HA you have to set these values.Till Rohrmann

1 Answers

2
votes

When starting a Flink cluster in HA mode, the JobManager address and its leader id are written to the specified ZooKeeper cluster. In order to communicate with the JobManager you have not only to know the address but also its leader address. Therefore, you have to specify the following parameters in your 'flink-conf.yaml` which is read by the CLI.

recovery.mode: zookeeper
recovery.zookeeper.quorum: address of your cluster
recovery.zookeeper.path.root: ZK path you've started your cluster with

With this information the client knows where it can find the ZooKeeper cluster and where to find the JobManager address and its leader id.