0
votes

I have a PCollection of BigQuery TableRow elements that are tagged depending on whether one column of the TableRow was successfully parsed or not.

final TupleTag<TableRow> OK = new TupleTag<TableRow>(){};
final TupleTag<TableRow> NOTOK = new TupleTag<TableRow>(){};

My ParDo function tags these TableRow based on the column parsing, and returns a PCollectionTuple called myPCollection.

I would like to do the following:

  1. Get all the elements in the PCollection (tagged both as OK and NOTOK), and output them to BigQuery.
  2. Get only the elements tagged as NOTOK and send them to Pub/Sub

I know I can do #2 by calling

myPCollection.get(NOTOK)

I cannot find a way to do #1. I saw there is a method called myPCollection.getAll() but instead of a PCollection it returns a Map, PCollection>

Any ideas on how to get the entire set of elements regardless of how they are tagged?

1

1 Answers

1
votes

You can use the Flatten transform (Beam guide) to merge different PCollections into a single one:

PCollection<String> okResults = myPCollection.get(OK);
PCollection<String> notOkResults = myPCollection.get(NOTOK);

PCollectionList<String> pcl = PCollectionList.empty(p);
pcl = pcl.and(okResults).and(notOkResults);
PCollection<String> allResults = pcl.apply(Flatten.pCollections());

In this case allResults will contain both OK and NOTOK elements. I made an example (full code here) with two input lines where they are classified into good or bad side outputs:

Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: bad line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$3 processElement
INFO: Ok element: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$4 processElement
INFO: Not Ok element: bad line

Tested with 2.17.0 SDK and the DirectRunner.