0
votes

Using Microsoft Azure, I have a cluster with 2 master nodes and 50 worker nodes. All nodes have 8 cores and 64 GB of RAM.

I am running a spark-submit job using pyspark. The most relevant part of my Python code is where I create a Python list arg_list of tuples of length 72. (Each tuple has about 6 elements and no two tuples are the same.) I then create an RDD with 72 partitions like so,

sc = spark.sparkContext
rdd = sc.parallelize(arg_list, len(arg_list))

My goal is to process all 72 partitions at the same time. Processing a partition involves the following:

  1. Read two files from Blob (HDFS) storage based on the values of the tuple elements received.
  2. Run a Python package called gensim to perform an NLP task using the content of the two files read.

To do this, I submit my spark-submit job as follows,

spark-submit --master yarn --num-executors 100 --executor-cores 3 --executor-memory 19G /path/to/my/file.py

The thinking is as follows. Use each worker node for 2 executors. Each executor gets 3 cores, so each node has 8-2*3=2 cores remaining for node managers and any other overhead tasks. Since I can get 2 executors per node and I have 50 nodes, I get 100 executors. (I admit that the cluster is a bit larger than needed in this case.)

When I run this code, I can monitor the activity of the worker nodes using Ambari. I had expected 72/2 = 36 worker nodes to be busy (as evidenced by average load) and 50-36=14 nodes to be idle. Instead, I see that only 12 nodes are busy and appear to be running 6 processes each.

It's maybe not a coincidence that 6*12=72, the number of tasks. It's as if Spark/YARN decided to ignore my parameters and cram the processing of my partitions into as few nodes as possible.

I also note that it seems to be taking a long time for any one of the 72 tasks to complete. I say this based on seeing a typical task take 3 hours when run in serial mode and seeing my Spark job run for 6 hours with no output at all.

Questions:

  1. Why am I using only 12 out of 50 worker nodes?
  2. Why is my code running so slow?

I've read guidelines on parameters for spark-submit/Yarn and thought what I wrote made sense. Is there some additional parameter setting that I'm missing?

1

1 Answers

1
votes

Spark will process each partition depending on the total amount of cores available to the job you're running.

Let's say your spark job has 100 executors, each one with 3 cores. This means that you'll be able to process 100 x 3 = 300 partitions concurrently, assuming spark.task.cpus is set to 1.

spark.task.cpus is the number of cores to allocate for each task and --executor-cores specify Number of cores per executor.

worker node with 2 executors, process 2 x 3 = 6 partitions. And default spark.default.parallelism = 12. So 6x12 = 72.

The two configuration properties in Spark to tune the number of partitions at runtime are as follows:

enter image description here

Increase the default parallelism by

--conf spark.default.parallelism=36 --conf spark.default.parallelism=36

set spark.task.cpus=2 and --executor-cores 4 (in spark submit command). So each node will process only (4/2 =) 2 partitions. In this case 36 nodes will be used in processing data in parallel.