My question is about an order of tasks in a Stage in Spark.
Context:
I have a Spark dataframe divided into 3000 partitions. Partitioning is done on one a specific Key. I use mapPartitionsWithIndex
to get an id
of a partition and number of elements it contains. For example:
df.rdd
.mapPartitionsWithIndex((i,rows) => Iterator((i,rows.size)))
.toDF("id", "numElements")
When Spark runs its calculation on my dataframe, I see in Spark UI (I also did some tests to make sure it is the case) that the task index
corresponds to partition id
, exactly the same as id
obtained with mapPartitionsWithIndex
above. So, tasks are executed in order of increasing id
of partition on given executor.
I see a clear correlation between number of rows in a partition and the execution time of a task. Due to a skewed nature of my dataset which can't be changed, I have several partitions with much higher number of elements (>8000) than the average (~3000). The time of execution of an average partition is 10-20 min, the larger ones can go above 3 hours. Some of my largest partitions have high id
and therefore the corresponding tasks are executed almost at the end of a stage. As a consequence, one of Spark Stages hangs for 3 hours on last 5 tasks.
Question:
Is there a way to reorder id
of partitions so that the tasks from largest partitions are executed first? Or equivalently, is there a way to change order of execution of tasks?
Note:
- I don't need to move partitions to other nodes or executors, just change order of execution.
- I can't change the key of partitioning
- I can change number of partitions but the problem will stay
My setup: Spark 2.2 with Mesos running with spark-submit. The job is run on 60 CPUs with 12 executors each with 5 CPUs.