I am implementing a Pub/Sub to BigQuery pipeline. It looks similar to How to create read transform using ParDo and DoFn in Apache Beam, but here, I have already a PCollection created.
I am following what is described in the Apache Beam documentation to implement a ParDo operation to prepare a table row using the following pipeline:
static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage message = c.element();
// Retrieve data from message
String rawData = message.getData();
Instant timestamp = new Instant(new Date());
// Prepare TableRow
TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
c.output(row);
}
}
// Read input from Pub/Sub
pipeline.apply("Read from Pub/Sub",PubsubIO.readMessagesWithAttributes().fromTopic(topicPath))
.apply("Prepare raw data for insertion", ParDo.of(new convertToTableRowFn()))
.apply("Insert in Big Query", BigQueryIO.writeTableRows().to(BQTable));
I found the DoFn function in a gist.
I keep getting the following error:
The method apply(String, PTransform<? super PCollection<PubsubMessage>,OutputT>) in the type PCollection<PubsubMessage> is not applicable for the arguments (String, ParDo.SingleOutput<PubsubMessage,TableRow>)
I always understood that a ParDo/DoFn operations is a element-wise PTransform operation, am I wrong ? I never got this type of error in Python, so I'm a bit confused about why this is happening.