0
votes

I am running EMR cluster with 3 m5.xlarge nodes (1 master, 2 core) and Flink 1.8 installed (emr-5.24.1).

On master node I start a Flink session within YARN cluster using the following command:

flink-yarn-session -s 4 -jm 12288m -tm 12288m

That is the maximum memory and slots per TaskManager that YARN let me set up based on selected instance types.

During startup there is a log:

org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=12288, taskManagerMemoryMB=12288, numberTaskManagers=1, slotsPerTaskManager=4}

This shows that there is only one task manager. Also when looking at YARN Node manager I see that there is only one container running on one of the core nodes. YARN Resource manager shows that the application is using only 50% of cluster.

With the current setup I would assume that I can run Flink job with parallelism set to 8 (2 TaskManagers * 4 slots), but in case that submitted job has set parallelism to more than 4, it fails after a while as it could not get desired resources.

In case the job parallelism is set to 4 (or less), the job runs as it should. Looking at CPU and memory utilisation with Ganglia it shows that only one node is utilised, while the other flat.

Why is application run only on one node and how to utilise the other node as well? Did I need to set up something on YARN that it would set up Flink on the other node as well?

In previous version of Flik there was startup option -n which was used to specify number of task managers. The option is now obsolete.

1

1 Answers

1
votes

When you're starting a 'Session Cluster', you should see only one container which is used for the Flink Job Manager. This is probably what you see in the YARN Resource Manager. Additional containers will automatically be allocated for Task Managers, once you submit a job.

How many cores do you see available in the Resource Manager UI?

Don't forget that the Job Manager also uses cores out of the available 8.

You need to do a little "Math" here. For example, if you would have set the number of slots to 2 per TM and less memory per TM, then submitted a job with parallelism of 6 it should have worked with 3 TMs.