We are running the following stage DAG and experiencing long shuffle read time for relatively small shuffle data sizes (about 19MB per task)
One interesting aspect is that waiting tasks within each executor/server have equivalent shuffle read time. Here is an example of what it means: for the following server one group of tasks waits about 7.7 minutes and another one waits about 26 s.
Here is another example from the same stage run. The figure shows 3 executors / servers each having uniform groups of tasks with equal shuffle read time. The blue group represents killed tasks due to speculative execution:
Not all executors are like that. There are some that finish all their tasks within seconds pretty much uniformly, and the size of remote read data for these tasks is the same as for the ones that wait long time on other servers. Besides, this type of stage runs 2 times within our application runtime. The servers/executors that produce these groups of tasks with large shuffle read time are different in each stage run.
Here is an example of task stats table for one of the severs / hosts:
It looks like the code responsible for this DAG is the following:
output.write.parquet("output.parquet")
comparison.write.parquet("comparison.parquet")
output.union(comparison).write.parquet("output_comparison.parquet")
val comparison = data.union(output).except(data.intersect(output)).cache()
comparison.filter(_.abc != "M").count()
We would highly appreciate your thoughts on this.