I am running a Spark job with some very big stages (e.g. >20k tasks), and am running it with 1k to 2k executors.
In some cases, a stage will appear to run unstably: many available executors become idle over time, despite still being in the middle of a stage with many unfinished tasks. From the user perspective, it appears that tasks are finishing, but executors that have finished a given task do not get a new task assigned to them. As a result, the stage takes longer than it should, and a lot of executor CPU-hours are being wasted on idling. This seems to mostly (only?) happen during input stages, where data is being read from HDFS.
Example Spark stderr log during an unstable period -- notice that the number of running tasks decreases over time until it almost reaches zero, then suddenly jumps back up to >1k running tasks:
[Stage 0:==============================> (17979 + 1070) / 28504]
[Stage 0:==============================> (18042 + 1019) / 28504]
[Stage 0:===============================> (18140 + 921) / 28504]
[Stage 0:===============================> (18222 + 842) / 28504]
[Stage 0:===============================> (18263 + 803) / 28504]
[Stage 0:===============================> (18282 + 786) / 28504]
[Stage 0:===============================> (18320 + 751) / 28504]
[Stage 0:===============================> (18566 + 508) / 28504]
[Stage 0:================================> (18791 + 284) / 28504]
[Stage 0:================================> (18897 + 176) / 28504]
[Stage 0:================================> (18940 + 134) / 28504]
[Stage 0:================================> (18972 + 107) / 28504]
[Stage 0:=================================> (19035 + 47) / 28504]
[Stage 0:=================================> (19067 + 17) / 28504]
[Stage 0:================================> (19075 + 1070) / 28504]
[Stage 0:================================> (19107 + 1039) / 28504]
[Stage 0:================================> (19165 + 982) / 28504]
[Stage 0:=================================> (19212 + 937) / 28504]
[Stage 0:=================================> (19251 + 899) / 28504]
[Stage 0:=================================> (19355 + 831) / 28504]
[Stage 0:=================================> (19481 + 708) / 28504]
This is what the stderr looks like when a stage is running stably -- the number of running tasks remains roughly constant, because new tasks are assigned to executors as they finish their previous tasks:
[Stage 1:===================> (11599 + 2043) / 28504]
[Stage 1:===================> (11620 + 2042) / 28504]
[Stage 1:===================> (11656 + 2044) / 28504]
[Stage 1:===================> (11692 + 2045) / 28504]
[Stage 1:===================> (11714 + 2045) / 28504]
[Stage 1:===================> (11741 + 2047) / 28504]
[Stage 1:===================> (11771 + 2047) / 28504]
[Stage 1:===================> (11818 + 2047) / 28504]
Under what circumstances would this happen, and how can I avoid this behavior?
NB: I am using dynamic allocation, but I'm pretty sure this is unrelated to this problem -- e.g., during an unstable period, in the Spark Application Master UI I can see that the expected number of executors are "Active", but are not running "Active Tasks."