9
votes

What is the purpose of org.apache.beam.sdk.transforms.Reshuffle? In the documentation the purpose is defined as:

A PTransform that returns a PCollection equivalent to its input but operationally provides some of the side effects of a GroupByKey, in particular preventing fusion of the surrounding transforms, checkpointing and deduplication by id.

What is the benefit of preventing fusion of the surrounding transforms? I thought fusion is an optimization to prevent unnecessarily steps. Actual use case would be helpful.

1

1 Answers

18
votes

There are a couple cases when you may want to reshuffle your data. The following is not an exhaustive list, but should give you and idea about why you may reshuffle:

When one of your ParDo transforms has a very high fanout

This means that the parallelism is increased after your ParDo. If you don't break the fusion here, your pipeline will not be able to split data into multiple machines to process it.

Consider the extreme case of a DoFn that generates a million output elements for every input element. Consider that this ParDo receives 10 elements in its input. If you don't break fusion between this high-fanout ParDo and its downstream transforms, it will only be able to run on 10 machines, although you will have millions of elements.

  • A good way to diagnose this is looking at the number of elements in an input PCollection vs the number of elements of an output PCollection. If the latter is significantly larger than the first, then you may want to consider adding a reshuffle.

When your data is not well balanced across machines**

Imagine that your pipeline consumes 9 files of 10MB and one file of 10GB. If each file is read by a single machine, you will have one machine with a lot more data than the others.

If you don't reshuffle this data, most of your machines will be idle while your pipeline runs. Reshuffling it allows you to rebalance the data to be processed more evenly across machines.

  • A good way to diagnose this is by looking at how many workers are executing work in your pipeline. If the pipeline is slow, and there is only one worker processing data, then you can benefit from a reshuffle.