3
votes

We are running the following stage DAG and experiencing long shuffle read time for relatively small shuffle data sizes (about 19MB per task)

enter image description here

One interesting aspect is that waiting tasks within each executor/server have equivalent shuffle read time. Here is an example of what it means: for the following server one group of tasks waits about 7.7 minutes and another one waits about 26 s.

enter image description here

Here is another example from the same stage run. The figure shows 3 executors / servers each having uniform groups of tasks with equal shuffle read time. The blue group represents killed tasks due to speculative execution:

enter image description here

Not all executors are like that. There are some that finish all their tasks within seconds pretty much uniformly, and the size of remote read data for these tasks is the same as for the ones that wait long time on other servers. Besides, this type of stage runs 2 times within our application runtime. The servers/executors that produce these groups of tasks with large shuffle read time are different in each stage run.

Here is an example of task stats table for one of the severs / hosts:

enter image description here

It looks like the code responsible for this DAG is the following:

output.write.parquet("output.parquet")
comparison.write.parquet("comparison.parquet")
output.union(comparison).write.parquet("output_comparison.parquet")
val comparison = data.union(output).except(data.intersect(output)).cache()
comparison.filter(_.abc != "M").count()

We would highly appreciate your thoughts on this.

2
Strange. Code and data samples would be appreciated. I see every step of that DAG has a cache call, are you caching everything?Garren S
Hello. Thank you for your question. I posted the code in the description above. We are caching only when we think it is needed.Dimon
The except and intersect calls are on my radar for concerns. Your DAG references a sortmergejoin; do you know already what line(s) are causing the trouble?Garren S
We think that sortmergejoin comes from except or intersect in the above code. Another piece of information is that we are using MesosExternalShuffleServiceDimon

2 Answers

1
votes

Apparently the problem was JVM garbage collection (GC). The tasks had to wait until GC is done on the remote executors. The equivalent shuffle read time resulted from the fact that several tasks were waiting on a single remote host performing GC. We followed advise posted here and the problem decreased by an order of magnitude. There is still small correlation between GC time on remote hosts and local shuffle read time. In the future we think to try shuffle service.

0
votes

Since google brought me here with the same problem but I needed another solution...

Another possible reason for small shuffle size taking a long time to read could be the data is split over many partitions. For example (apologies this is pyspark as it is all I have used):

my_df_with_many_partitions\ # say has 1000 partitions
    .filter(very_specific_filter)\ # only very few rows pass
    .groupby('blah')\
    .count()

The shuffle write from the filter above will be very small, so for the stage after we will have a very small amount to read. But to read it you need to check a lot of empty partitions. One way to address this would be:

my_df_with_many_partitions\
    .filter(very_specific_filter)\
    .repartition(1)\
    .groupby('blah')\
    .count()