1
votes

I am struggling to batch Google Pub/Sub data to send to Apache Beam. Here's my basic code.

 p.begin()
            .apply("Input", PubsubIO.readAvros(CmgData.class).fromTopic("topicname"))
            .apply("Transform", ParDo.of(new TransformData()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

Obviously Apache Beam thinks the data is unbound as its coming from a Subscription however I want to batch it and send it across. There is a lot of different items mentioning Bounded such as:- PCollection.IsBounded (https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/values/PCollection.IsBounded.html) - Seems to have no effect on the write.

BoundedReadFromUnboundedSource -(https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.html) - Can't find a way to convert a PCollection to a bounded source or vice versa.

BoundedWindow - (https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/transforms/windowing/BoundedWindow.html) - Can't find a working usage

Write.Method - (https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html) - Throws an IllegalArgumentException when I try to use it.

Can someone point me in the direction of how to declare an object is Bounded Data so I can batch process it rather than just stream?

1
Can you use something like PubsubIO.Read.Bound when you're reading the data? It's documented here - cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/…Chris Halcrow
@ChrisHalcrow couldn't seem to get that working either. (There is something similar in the Beam documentation which is the newer stuff)Campey

1 Answers

2
votes

For more details you can see my other question BigQuery writeTableRows Always writing to buffer

However adding the following three lines means the data will be bound:-

            .withMethod(Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)