0
votes

Trying to enable the streaming in Cloud Dataflow Job, which needs to be read the data from one BigQuery table and write it to another BigQuery table with append mode.

For this, I enabled options.setStreaming(true); in Java code.

Applied windowing concept - Fixed Window option(below code),

PCollection<TableRow> fixedWindowedItems = finalRecords.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));

Finally write the data to BigQuery table with BigQueryIO(below code),

fixedWindowedItems.apply(BigQueryIO.writeTableRows()
                .withSchema(schema1)
                .to(options.getTargetTable())
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Code works fine. No errors. Data has been moved from one table to another table for the firsttime. But, if you are inserting new data in first table then second table does not get reflected. Job seems to be completed with Succeeded status, though Job type as Streaming.

Could you let me know if something I missed in code/configuration level to enable streaming mode.

1
Can you explain what is the motivation to keep two BigQuery tables in sync? - Reading from BigQuery is not supported as a streaming source. It's only intended as a Batch source. What's happening is that you're reading a batch of data from one table to the other. - so I am inclined to ask agian: WHy are you interested in continuously moving data from one BQ table to another? - Pablo

1 Answers

0
votes

Preliminary answer:

The functionality that you are looking for is BigQuery outputs a stream of changes, and that stream is applied to another BigQuery table, right? This is not what the Apache Beam / Dataflow BigQuery source offers.

Your pipeline runs and finishes because it copies/queries a batch, data from a BigQuery table into another.

Why are you looking to keep two BQ tables in sync? If you explain your scenario we can work together to improve it.