0
votes

I'm playing with flink on yarn for testing purposes I have the following setup:

3 machines on aws (32 cores and 64 GB of memory)

I installed Hadoop 2 with hdfs and yarn services manually (without using EMR).

Machine #1 runs HDFS - (NameNode & SeconderyNameNode) and YARN - (resourcemanager) , defined in masters file

Machine #2 runs HDFS - (datanode) and YARN - (nodemanager) , definded in slaves file

Machine #3 runs HDFS - (datanode) and YARN - (nodemanager) , defined in slaves file

I want to submit Apache flink job that reads about 20GB of logs from hdfs process them and than store the result in cassandra

The problem is that i think i'm doing wrong because the job takes quite a lot of time about an hour, and i think it's not very optimized.

i running flink with the following command:

./flink-1.3.0/bin/flink run -yn 2 -ys 30 -yjm 7000 -ytm 8000 -m yarn-cluster /home/ubuntu/reports_script-1.0-SNAPSHOT.jar

and i'm seeing on flink logs that there are 60 task slots in use, but when i'm looking at yarn page i'm seeing very low usage of vcores and memory

Hadoop yarn page

what am i doing wrong?

1
Is it possible that Cassandra is a misconfigured/the bottleneck? Did you try to run your job from HDFS to HDFS?twalthr
the cassandra is certainly not the bottleneck, it is the fastest part of the processing. the thing that really bothers me that on the picture i attached to the question , i can see that it opened 3 containers (application master and 2 task managers for flink) and they use only 3 Vcores... i can't figure out how to utilize all vcores and memory of the cluster.user1358729
If you want to configure the number of vcores per container, then use the configuration option: yarn.containers.vcores. More Yarn specific configuration options can be found hereTill Rohrmann

1 Answers

1
votes

A few things to look out for:

  • The default value for the number of vcores per TaskManager container is one. To increase that, use the yarn.containers.vcores parameter. Unless you use a container executor that enforces that the container uses only vcore many CPU cores, it may not make a difference at all to the job (and only looks weird in the YARN UI).

  • Giving 7GB memory to a TaskManager means it will actually get a JVM heap of around 5.2 GB, because some "cutoff" is taken for the JVM. Having 5.3GB for 30 slots means about 170 MBs of memory per slot. That works, buts is actually not a lot.

  • Check the Flink web UI to make sure your job does run with the proper parallelism. You can also check where (which operation) the time goes.