0
votes

I've set up a Hadoop 2.7.5.HA cluster and running Flink 1.4.0 applications using the default YARN queue. I decided to categoruze applications and run them on exclusive nodemanagers, so I labeled three nodes, each 4 core and 2GB RAM as stream in queue streamQ and three nodes each 1 core and 1GB RAM as onlinein queue onlineQ and all the settings are displayed in YARN webUI as desired and nodes are identified. Here is the capacity-scheduler.xml:

<property>
<name>yarn.scheduler.capacity.maximum-applications</name>
<value>10000</value>
</property>

<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.1</value>
</property>

<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
</property>

<property>
<name>yarn.scheduler.capacity.node-locality-delay</name>
<value>40</value>
</property>

<property>
<name>yarn.scheduler.capacity.queue-mappings</name>
<value></value>
</property>

<property>
<name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
<value>false</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>streamQ,onlineQ</value>
</property>

<!-- streamQ settings -->

<property>
<name>yarn.scheduler.capacity.root.streamQ.capacity</name>
<value>0</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.accessible-node-labels</name>
<value>stream</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.accessible-node-labels.stream.capacity</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.accessible-node-labels.stream.maximum-capacity</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.default-node-label-expression</name>
<value>stream</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.user-limit-factor</name>
<value>1</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.maximum-capacity</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.state</name>
<value>RUNNING</value>
</property>

<property>


<name>yarn.scheduler.capacity.root.streamQ.acl_submit_applications</name>
<value>*</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.streamQ.acl_administer_queue</name>
<value>*</value>
</property>

<!-- onlineQ settings -->

<property>
<name>yarn.scheduler.capacity.root.onlineQ.capacity</name>
<value>0</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.accessible-node-labels</name>
<value>online</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.accessible-node-labels.online.capacity</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.accessible-node-labels.online.maximum-capacity</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.default-node-label-expression</name>
<value>online</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.user-limit-factor</name>
<value>1</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.maximum-capacity</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.state</name>
<value>RUNNING</value>
</property>

<property>
 <name>yarn.scheduler.capacity.root.onlineQ.acl_submit_applications</name>
<value>*</value>
</property>

<property>
<name>yarn.scheduler.capacity.root.onlineQ.acl_administer_queue</name>
<value>*</value>
</property>

I run the command to start the Flink session on an edge node with all hadoop configuration the same as cluster:

yarn-session.sh -n 2 -jm 768 -tm 768 -nm flink -z flink_zoo -s 3 -qu streamQ

it successfully uploads Flink libs on HDFS and in YARN webUI I can see the application, but when it attempts to get resources, it says:

018-01-28 10:02:04,087 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

Here is the whole logs:

2018-01-28 10:00:09,648 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2018-01-28 10:00:09,649 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-01-28 10:00:09,650 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 768
2018-01-28 10:00:09,650 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 768
2018-01-28 10:00:09,650 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-01-28 10:00:09,650 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2018-01-28 10:00:09,650 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-01-28 10:00:09,650 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.port, 8081
2018-01-28 10:00:10,003 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-01-28 10:00:10,069 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to manager (auth:SIMPLE)
2018-01-28 10:00:10,377 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Cluster specification: ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=768, numberTaskManagers=2, slotsPerTaskManager=3}
2018-01-28 10:00:10,747 WARN  org.apache.flink.yarn.YarnClusterDescriptor                   - The configuration directory ('/opt/flink/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2018-01-28 10:00:10,751 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/conf/log4j.properties to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/log4j.properties
2018-01-28 10:00:11,123 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/lib/log4j-1.2.17.jar to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/lib/log4j-1.2.17.jar
2018-01-28 10:00:11,384 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/lib/flink-dist_2.11-1.4.0.jar to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/lib/flink-dist_2.11-1.4.0.jar
2018-01-28 10:00:30,986 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/lib/flink-shaded-hadoop2-uber-1.4.0.jar to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/lib/flink-shaded-hadoop2-uber-1.4.0.jar
2018-01-28 10:00:40,852 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/lib/flink-python_2.11-1.4.0.jar to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/lib/flink-python_2.11-1.4.0.jar
2018-01-28 10:00:41,017 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/lib/slf4j-log4j12-1.7.7.jar to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/lib/slf4j-log4j12-1.7.7.jar
2018-01-28 10:00:41,250 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/conf/logback.xml to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/logback.xml
2018-01-28 10:00:41,386 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/opt/flink/lib/flink-dist_2.11-1.4.0.jar to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/flink-dist_2.11-1.4.0.jar
2018-01-28 10:01:02,966 INFO  org.apache.flink.yarn.Utils                                   - Copying from /tmp/application_1517118829753_0002-flink-conf.yaml285707454205346702.tmp to hdfs://ha-cluster/user/manager/.flink/application_1517118829753_0002/application_1517118829753_0002-flink-conf.yaml285707454205346702.tmp
2018-01-28 10:01:03,601 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Submitting application master application_1517118829753_0002
2018-01-28 10:01:03,782 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1517118829753_0002
2018-01-28 10:01:03,783 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Waiting for the cluster to be allocated
2018-01-28 10:01:03,796 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Deploying cluster, current state ACCEPTED

What is the problem?

2

2 Answers

0
votes

Editing the capacity-scheduler.xml, solved the problem:

<!-- configuration of queue-root -->


<property> 
  <name>yarn.scheduler.capacity.root.queues</name> 
  <value>streamQ,onlineQ</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.accessible-node-labels</name> 
  <value>*</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.accessible-node-labels.stream.capacity</name> 
  <value>100</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.accessible-node-labels.online.capacity</name> 
  <value>100</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.default-node-label-expression</name> 
  <value>*</value> 
</property>


 <!-- configuration of queue-streamQ -->


<property> 
  <name>yarn.scheduler.capacity.root.streamQ.capacity</name> 
  <value>50</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.streamQ.maximum-capacity</name> 
  <value>100</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.streamQ.accessible-node-labels</name> 
  <value>stream</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.streamQ.accessible-node-labels.stream.capacity</name> 
  <value>100</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.streamQ.accessible-node-labels.online.capacity</name> 
  <value>0</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.streamQ.default-node-label-expression</name> 
  <value>stream</value> 
</property>


<!-- configuration of queue-streamQ -->


<property> 
  <name>yarn.scheduler.capacity.root.onlineQ.capacity</name> 
  <value>50</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.onlineQ.maximum-capacity</name> 
  <value>100</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.onlineQ.accessible-node-labels</name> 
  <value>online</value> 
</property>

<property> 
  <name>yarn.scheduler.capacity.root.onlineQ.accessible-node-labels.online.capacity</name> 
  <value>100</value>
</property>

<property> 
  <name>yarn.scheduler.capacity.root.onlineQ.accessible-node-labels.stream.capacity</name> 
  <value>0</value>
</property>

<property> 
  <name>yarn.scheduler.capacity.root.onlineQ.default-node-label-expression</name> 
  <value>online</value> 
</property>

</configuration>
-1
votes

Please check your flink app logs to see if there is some issue when connect to yarn resourcemanager. I also encounted the issue when I use flink on yarn with HA.I am not sure if I was the only one.