3
votes

I'm implementing a Dataflow pipeline that reads messages from Pubsub and writes TableRows into BigQuery (BQ) using Apache Beam SDK 2.0.0 for Java.

This is the related portion of code:

 tableRowPCollection
            .apply(BigQueryIO.writeTableRows().to(this.tableId)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

This code generates a group of tasks under the hood in the Dataflow pipeline. One of these tasks is the GroupByKey. This task is accumulating elements in the pipeline as can be seen in this print screen: GBK elements accumulation image. After reading the docs I suspect that this issue relates to the Window config. But I could not find a way of modifying the Window configuration since it is implicitly created by Window.Assign transform inside the Reshuffle task.

Is there a way of setting the window parameters and/or attaching triggers to this implicit Window or should I create my own DoFn that inserts a TableRow in BQ?

Thanks in advance!


[Update]

I left the pipeline running for a day approximately and after that the GroupByKey subtask became faster and the number of elements coming in and coming out approximated to each other (sometimes were the same). Furthermore, I also noticed that the Watermark got closer to the current date and was increasing faster. So the "issue" was solved.

1
What is the issue? GroupByKey groups the values associated with a specific key. As such, if you have N keys, you would expect N elements coming out of the GroupByKey. What you have described sounds like it is working as intended.Ben Chambers
@BenChambers The point is that I don' t want to wait too long to insert the data in BQ. I 'd like to choose the trigger to be used in this sub task.Carlos Baqueta

1 Answers

0
votes

There isn't any waiting introduced by the Reshuffle in the BigQuery sink. Rather, it is used to create the batches for of rows to write to BigQuery. The number of elements coming out of the GroupByKey is smaller because each output element represents a batch (or group) of input elements.

You should be able to see the total number of elements coming out as the output of the ExpandIterable (the output of the Reshuffle).