1
votes

In my apache beam and dataflow pipelines , i do some transforms that need a global combine operation , like min , max , custom global combine functions. The number of items to be processed in the pcollection is in order 2-4 billions.

I read that most combine operation are built on top of groupBykey , which causes a shuffle , which i believe makes my current pipeline slow or as observed from UI the highest wall time in the global combine operation. I looked into code and the groupByKey tries to add a static void key to all elements and then do a groupby , does this mean we are shuffling the data (specially when we just have one key) ? Are there ways to do this effeciently

Another question for my own understanding : beam/dataflow documentation says all elements of a key are to be handled by a single worker/thread. Taking example of finding a max in pcollection of integers, this global operation is completely parallelizeable where my combiner/accumulator work on partial/subset of data to find max and then merge the partial results (merging two max to get max), in a tree like structure where results of leaves can be merged to get parent node and each node basically can be evaluated in a distributed manner. So what operation exactly enforces that one key has to be handled by one worker/thread. seems like any global operation where you have combiner that is commutative and associative can be easily parallelized. What part of the global combine needs to go through a single worker thread ?

1
I have observed that behaviour, and I am struggling with it. When you create a Combine.globally() only the methods createAccumulator, addInput and extractOutput get called. That means that threre was a shuffle and mergeAccumulators was not needed. I am trying to figure out the reason looking to the code but apache beam code is awful. For me makes sense to do a partial combine per bundle later partial merges and finally collect all. But that is not really what is happening so the doc is lying. - Rafael

1 Answers

0
votes

The combiner will be lifted ahead of the shuffle (meaning we do some combining before passing to a shuffle). There's a little bit of information here: https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind , search for combiner.

Dataflow will assign a distinct key to each element, so you will not end up with all the same keys (and therefore no parallelism). If it was all assigned to one key, then only one worker would be able to process and it would be very slow.