Using Microsoft Azure, I have a cluster with 2 master nodes and 50 worker nodes. All nodes have 8 cores and 64 GB of RAM.
I am running a spark-submit job using pyspark. The most relevant part of my Python code is where I create a Python list arg_list
of tuples of length 72. (Each tuple has about 6 elements and no two tuples are the same.) I then create an RDD with 72 partitions like so,
sc = spark.sparkContext
rdd = sc.parallelize(arg_list, len(arg_list))
My goal is to process all 72 partitions at the same time. Processing a partition involves the following:
- Read two files from Blob (HDFS) storage based on the values of the tuple elements received.
- Run a Python package called
gensim
to perform an NLP task using the content of the two files read.
To do this, I submit my spark-submit job as follows,
spark-submit --master yarn --num-executors 100 --executor-cores 3 --executor-memory 19G /path/to/my/file.py
The thinking is as follows. Use each worker node for 2 executors. Each executor gets 3 cores, so each node has 8-2*3=2 cores remaining for node managers and any other overhead tasks. Since I can get 2 executors per node and I have 50 nodes, I get 100 executors. (I admit that the cluster is a bit larger than needed in this case.)
When I run this code, I can monitor the activity of the worker nodes using Ambari. I had expected 72/2 = 36 worker nodes to be busy (as evidenced by average load) and 50-36=14 nodes to be idle. Instead, I see that only 12 nodes are busy and appear to be running 6 processes each.
It's maybe not a coincidence that 6*12=72, the number of tasks. It's as if Spark/YARN decided to ignore my parameters and cram the processing of my partitions into as few nodes as possible.
I also note that it seems to be taking a long time for any one of the 72 tasks to complete. I say this based on seeing a typical task take 3 hours when run in serial mode and seeing my Spark job run for 6 hours with no output at all.
Questions:
- Why am I using only 12 out of 50 worker nodes?
- Why is my code running so slow?
I've read guidelines on parameters for spark-submit/Yarn and thought what I wrote made sense. Is there some additional parameter setting that I'm missing?