We use protobuf with GCP's pubsub and dataflow. We define both data we send to pubsub as well as bigquery schema with a single proto file.
publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery
Sometime dataflow does some cosmetic changes but it's mostly copying fields from protobuf to bigquery.
My question is that, is there a way to automatically convert protobuf model to bigquery's TableRow?
Simplified dataflow code we have now is below. I want to eliminate most of code in ProtoToTableRow class :
public class MyPipeline {
public static void main(String[] args) {
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
}
}
// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
Core.Foo foo = c.element().getFoo();
TableRow fooRow = new TableRow()
.set("id", foo.getId())
.set("bar", foo.getBar())
.set("baz", foo.getBaz());
// similar code repeated for 100s of lines
TableRow row = new TableRow()
.set("foo", foo)
c.output(row);
}
}