1
votes

I am implementing a Pub/Sub to BigQuery pipeline. It looks similar to How to create read transform using ParDo and DoFn in Apache Beam, but here, I have already a PCollection created.

I am following what is described in the Apache Beam documentation to implement a ParDo operation to prepare a table row using the following pipeline:

static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage message = c.element();
        // Retrieve data from message
        String rawData = message.getData();
        Instant timestamp = new Instant(new Date());
        // Prepare TableRow
        TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
        c.output(row);
    }
}

// Read input from Pub/Sub
pipeline.apply("Read from Pub/Sub",PubsubIO.readMessagesWithAttributes().fromTopic(topicPath))
        .apply("Prepare raw data for insertion", ParDo.of(new convertToTableRowFn()))
        .apply("Insert in Big Query", BigQueryIO.writeTableRows().to(BQTable));

I found the DoFn function in a gist.

I keep getting the following error:

The method apply(String, PTransform<? super PCollection<PubsubMessage>,OutputT>) in the type PCollection<PubsubMessage> is not applicable for the arguments (String, ParDo.SingleOutput<PubsubMessage,TableRow>)

I always understood that a ParDo/DoFn operations is a element-wise PTransform operation, am I wrong ? I never got this type of error in Python, so I'm a bit confused about why this is happening.

1

1 Answers

5
votes

You're right, ParDos are element-wise transforms and your approach looks correct.

What you're seeing is the compilation error. Something like this happens when the argument type of the apply() method that was inferred by java compiler doesn't match the type of the actual input, e.g. convertToTableRowFn.

From the error you're seeing it looks like java infers that the second parameter for apply() is of type PTransform<? super PCollection<PubsubMessage>,OutputT>, while you're passing the subclass of ParDo.SingleOutput<PubsubMessage,TableRow> instead (your convertToTableRowFn). Looking at the definition of SingleOutput your convertToTableRowFn is basically a PTransform<PCollection<? extends PubsubMessage>, PCollection<TableRow>>. And java fails to use it in apply where it expects PTransform<? super PCollection<PubsubMessage>,OutputT>.

What looks suspicious is that java didn't infer the OutputT to PCollection<TableRow>. One reason it would fail to do so if you have other errors. Are you sure you don't have other errors as well?

For example, looking at convertToTableRowFn you're calling message.getData() which doesn't exist when I'm trying to do it and it fails compilation there. In my case I need to do something like this instead: rawData = new String(message.getPayload(), Charset.defaultCharset()). Also .to(BQTable)) expects a string (e.g. a string representing the BQ table name) as an argument, and you're passing some unknown symbol BQTable (maybe it exists in your program somewhere though and this is not a problem in your case).

After I fix these two errors your code compiles for me, apply() is fully inferred and the types are compatible.