1
votes

My question is about an order of tasks in a Stage in Spark.

Context:

I have a Spark dataframe divided into 3000 partitions. Partitioning is done on one a specific Key. I use mapPartitionsWithIndex to get an id of a partition and number of elements it contains. For example:

df.rdd
  .mapPartitionsWithIndex((i,rows) => Iterator((i,rows.size)))
  .toDF("id", "numElements")

When Spark runs its calculation on my dataframe, I see in Spark UI (I also did some tests to make sure it is the case) that the task index corresponds to partition id, exactly the same as id obtained with mapPartitionsWithIndex above. So, tasks are executed in order of increasing id of partition on given executor.

I see a clear correlation between number of rows in a partition and the execution time of a task. Due to a skewed nature of my dataset which can't be changed, I have several partitions with much higher number of elements (>8000) than the average (~3000). The time of execution of an average partition is 10-20 min, the larger ones can go above 3 hours. Some of my largest partitions have high id and therefore the corresponding tasks are executed almost at the end of a stage. As a consequence, one of Spark Stages hangs for 3 hours on last 5 tasks.

Question:

Is there a way to reorder id of partitions so that the tasks from largest partitions are executed first? Or equivalently, is there a way to change order of execution of tasks?

Note:

  • I don't need to move partitions to other nodes or executors, just change order of execution.
  • I can't change the key of partitioning
  • I can change number of partitions but the problem will stay

My setup: Spark 2.2 with Mesos running with spark-submit. The job is run on 60 CPUs with 12 executors each with 5 CPUs.

1
Did you find a way to prove me wrong? You can salt the key if there is no issue with dependencies of data over different partitions.thebluephantom
@bluephantom Not yet, but I was hoping someone could provide more insightful answer than "No, there is not. If so, it would be in the docs by now."astro_asz
But sometimes that is the simple answer. I made a comparison with Informatica that works differently.thebluephantom
Do you know the quote of Carl Sagan: Absence Of Evidence Is Not Evidence Of Absence. Just because it is not in docs doesn't mean it is not possible.astro_asz
Sadly, people who quote Carl Sagan on this don’t seem to know the particular circumstances in which he said this or what, exactly, he was referring to. And they apparently don’t even realize that Sagan was actually quoting a “maxim” often stated by fellow cosmologist Martin Rees.thebluephantom

1 Answers

1
votes

No, there is not. If so, it would be in the docs by now.

You can not control the ordering (/prioritization) of Tasks - since the Spark Task Scheduler does not have an interface to define such order/prioritization.

Spark works differently to say Informatica. A Stage - thus all tasks - must complete entirely before next Stage can commence for a given Action.

8000 seems to take a long time.