1
votes

I'm trying to apply a PTransform to a PCollectionTuple, but can't figure out why the compiler is complaining.

I want to do this in order to abstract into a single PTransform the multiple steps required to join some csv lines (each PCollection in the PCollectionTuple contains the csv lines to join), and the problem I have is not with the join itself, but with how to apply a PTransform to a PCollectionTuple.

This is my code:

static class JoinCsvLines extends DoFn<PCollectionTuple, String[]> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            PCollectionTuple element = context.element();
            // TODO: Implement the output
        }
    }

And I call the PTransform like this:

TupleTag<String[]> tag1 = new TupleTag<>();
TupleTag<String[]> tag2 = new TupleTag<>();
PCollectionTuple toJoin = PCollectionTuple.of(tag1, csvLines1).and(tag2, csvLines2);

// Can't compile this line
PCollection<String[]> joinedLines = toJoin.apply("JoinLines", ParDo.of(new JoinCsvLines()));

IntelliJ IDEA outputs the following when I hover above the line that does not compile:

Required type:
PTransform
<? super PCollectionTuple,
OutputT>
Provided:
SingleOutput
<PCollectionTuple,
String[]>
reason: no instance(s) of type variable(s) InputT exist so that PCollectionTuple conforms to PCollection<? extends InputT>

How can I apply the PTransform to the PCollectionTuple?

1

1 Answers

0
votes

DoFn<PCollectionTuple, String[]> means you want to apply a "DoFn" per record, thus you shouldn't use PCollectionTuple as input type. Instead, you should use the type of your "csvLines1" and "csvLines2".

If your intention is to merge two PCollections, you can check Flatten transform: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java#L41