2
votes

I am running a Spark job with some very big stages (e.g. >20k tasks), and am running it with 1k to 2k executors.

In some cases, a stage will appear to run unstably: many available executors become idle over time, despite still being in the middle of a stage with many unfinished tasks. From the user perspective, it appears that tasks are finishing, but executors that have finished a given task do not get a new task assigned to them. As a result, the stage takes longer than it should, and a lot of executor CPU-hours are being wasted on idling. This seems to mostly (only?) happen during input stages, where data is being read from HDFS.

Example Spark stderr log during an unstable period -- notice that the number of running tasks decreases over time until it almost reaches zero, then suddenly jumps back up to >1k running tasks:

[Stage 0:==============================>                 (17979 + 1070) / 28504]
[Stage 0:==============================>                 (18042 + 1019) / 28504]
[Stage 0:===============================>                 (18140 + 921) / 28504]
[Stage 0:===============================>                 (18222 + 842) / 28504]
[Stage 0:===============================>                 (18263 + 803) / 28504]
[Stage 0:===============================>                 (18282 + 786) / 28504]
[Stage 0:===============================>                 (18320 + 751) / 28504]
[Stage 0:===============================>                 (18566 + 508) / 28504]
[Stage 0:================================>                (18791 + 284) / 28504]
[Stage 0:================================>                (18897 + 176) / 28504]
[Stage 0:================================>                (18940 + 134) / 28504]
[Stage 0:================================>                (18972 + 107) / 28504]
[Stage 0:=================================>                (19035 + 47) / 28504]
[Stage 0:=================================>                (19067 + 17) / 28504]
[Stage 0:================================>               (19075 + 1070) / 28504]
[Stage 0:================================>               (19107 + 1039) / 28504]
[Stage 0:================================>                (19165 + 982) / 28504]
[Stage 0:=================================>               (19212 + 937) / 28504]
[Stage 0:=================================>               (19251 + 899) / 28504]
[Stage 0:=================================>               (19355 + 831) / 28504]
[Stage 0:=================================>               (19481 + 708) / 28504]

This is what the stderr looks like when a stage is running stably -- the number of running tasks remains roughly constant, because new tasks are assigned to executors as they finish their previous tasks:

[Stage 1:===================>                            (11599 + 2043) / 28504]
[Stage 1:===================>                            (11620 + 2042) / 28504]
[Stage 1:===================>                            (11656 + 2044) / 28504]
[Stage 1:===================>                            (11692 + 2045) / 28504]
[Stage 1:===================>                            (11714 + 2045) / 28504]
[Stage 1:===================>                            (11741 + 2047) / 28504]
[Stage 1:===================>                            (11771 + 2047) / 28504]
[Stage 1:===================>                            (11818 + 2047) / 28504]

Under what circumstances would this happen, and how can I avoid this behavior?

NB: I am using dynamic allocation, but I'm pretty sure this is unrelated to this problem -- e.g., during an unstable period, in the Spark Application Master UI I can see that the expected number of executors are "Active", but are not running "Active Tasks."

1

1 Answers

1
votes

I've seen behavior like this from spark when the amount of time taken per task is very low. For some reason, the scheduler seems to assume that the job will complete faster without the extra distribution overhead, since each task is completing so quickly.

A couple of things to try:

  • Try .coalesce() to reduce the number of partitions, so that each partition takes longer to run (granted, this could cause a shuffle step and may increase overall job time, you'll have to expiriment)
  • Tweak the spark.locality.wait* settings here. If each task takes less than the default wait times of 3s, then perhaps the scheduler is just trying to keep the existing slots full and never has a chance to allocate more slots.

I've yet to track down exactly what causes this issue, so these are only speculations and hunches based on my own observations in my own (much smaller) cluster.