12
votes

I have a 2-node Spark cluster with 4 cores per node.

        MASTER
(Worker-on-master)              (Worker-on-node1)

Spark config:

  • slaves: master, node1
  • SPARK_WORKER_INSTANCES=1

I am trying to understand Spark's paralellize behaviour. The sparkPi example has this code:

val slices = 8  // my test value for slices
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)

As per documentation:

Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster.

I set slices to be 8 which means the working set will be divided among 8 tasks on the cluster, in turn each worker node gets 4 tasks (1:1 per core)

Questions:

  1. Where can I see task level details? Inside executors I don't see task breakdown so I can see the effect of slices on the UI.

  2. How to programmatically find the working set size for the map function above? I assume it is n/slices (100000 above)

  3. Are the multiple tasks run by an executor run sequentially or paralell in multiple threads?

  4. Reasoning behind 2-4 slices per CPU.

  5. I assume ideally we should tune SPARK_WORKER_INSTANCES to correspond to number of cores in each node (in a homogeneous cluster) so that each core gets its own executor and task (1:1:1)

3

3 Answers

7
votes

I will try to answer your question as best I can:

1.- Where can I see task level details?

When submitting a job, Spark stores information about the task breakdown on each worker node, apart from the master. This data is stored, I believe (I have only tested with Spark for EC2), on the work folder under the spark directory.

2.- How to programmatically find the working set size for the map function?

Although I am not sure if it stores the size in memory of the slices, the logs mentioned on the first answer provide information about the amount of lines each RDD partition contains.

3.- Are the multiple tasks run by an executor run sequentially or paralelly in multiple threads?

I believe diferent tasks inside a node run sequentially. This is shown on the logs indicated above, which indicate the start and end time of every task.

4.- Reasoning behind 2-4 slices per CPU

Some nodes finish their tasks faster than others. Having more slices than available cores distributes the tasks in a balanced way avoiding long processing time due to slower nodes.

3
votes

Taking a stab at #4:

For #4 it's worth noting that "slices" and "partitions" are the same thing, there is a bug filed and efforts to clean up the docs: https://issues.apache.org/jira/browse/SPARK-1701

Here's a link that expands the reasoning in #4: http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism

Specifically look at the line:

In general, we recommend 2-3 tasks per CPU core in your cluster.

An important consideration is to avoid shuffling, and setting number of slices is part of that. It's a more complicated subject than I fully understand to explain fully here -- the basic idea is to partition your data into enough partitions/slices up front to avoid Spark having to re-shuffle to get more partitions later.

0
votes

1) Where can I see task level details? Inside executors I don't see task breakdown so I can see the effect of slices on the UI.

I do not understand your question as from the UI we can definitely see the effect of partitioning (or slices if you prefer).

2) How to programmatically find the working set size for the map function above? I assume it is n/slices (100000 above)

please give more details on what size are you interested. If you mean the amount of memory consumed by each worker ... each Spark partition has 64MB so ... from the official Spark documentation :

Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.

3) Are the multiple tasks run by an executor run sequentially or paralell in multiple threads?

a good source for this is this question :

Spark executor & tasks concurrency

4) Reasoning behind 2-4 slices per CPU. I assume ideally we should tune SPARK_WORKER_INSTANCES to correspond to number of cores in each node (in a homogeneous cluster) so that each core gets its own executor and task (1:1:1)

the major goal is not to have idle workers... once it finishes one task it will always have something to work with while waiting for other nodes to complete longer tasks. With a (1:1:1) the workers would be idle.