2
votes

I'm using Google Cloud Dataflow and have a ParDo function that requires access to all the elements in a PCollection. To accomplish this, I wanted to convert a PCollection<T> into a PCollection<Iterable<T>> containing a single Iterable of all the elements. I was wondering if there's a cleaner/simpler/faster solution to what I have come up with.

The first approach was to create a dummy key, perform a GroupByKey, and get the values afterwards.

PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey())); 
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()

The second approach followed the recommendation here: How do I make View's asList() sortable in Google Dataflow SDK? but without the sorting. I created a View.asList(), created a dummy PCollection, and then applied a ParDo function on the dummy PCollection with the view as a side input and simply returned the view.

PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList()); 
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
        ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
            @Override
            public void processElement(ProcessContext c) {
                c.output(c.sideInput(myDataView)); 
            }
        }));

It seems like there would be a pre-defined combine function for this task, but I can't find one. Thanks for the help!

1

1 Answers

1
votes

If you know you need the whole thing, then both of your approaches are reasonable. Both have been used in the Dataflow SDK and later when it became the Apache Beam SDK.

  1. Side input and then output the whole thing: This is how DataflowAssert works, in fact. In Beam, where different backend runners may implement side inputs differently, you should prefer View.asIterable() since it has fewer assumptions and may allow more streaming of a very large side input.
  2. Group by a single key and then drop the key: This is how Beam's successor PAssert works. It accomplishes the same thing, requires a little more care for empty collections, but more Beam runners have good GroupByKey support than side input support (especially when they are new and still under development).

So View.asIterable() is basically intended to be just what you are asking for. There have also been some requests for a GroupGlobally transform that does the second version; that could happen at some point.