1
votes

I am streaming data from kafka to BigQuery using apache beam with google dataflow runner. I wanted to make use of insertId for deduplication, that I found described in google docs. But even tho inserts are happening within few seconds from each other I still see a lot of rows with the same insertId. Now I'm wondering that perhaps I am not using the API correctly to take advantage of deduplication mechanism for streaming inserts offered by BQ.

My code in beam for writing looks as follows:

payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
            .withFormatFunction(ps -> FxTableRowConverter.convertFxPaymentToTableRow(ps))
            .to(bqTradePaymentTable)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Besides all other fields I am setting insertId directly on TableRow in FxTableRowConverter.convertFxPaymentToTableRow method passed to BigQueryIO as format function:

row.set("insertId", insertId);

I also added that field as a column to BQ. Without it, it was failing on inserts (obviously). I couldn't find any other way to set insertId directly on BigQueryIO other than adding it to TableRow object.

Is this the correct way of using this? Because it does not work for me, I am seeing many duplicates even tho I shouldn't, since like I already mentioned inserts are happening within seconds. BigQuery doc states that streaming buffer is keeping insertId for at least one minute.

1
As you can read in cloud.google.com/bigquery/streaming-data-into-bigquery We can able to deduplicate data on basis on insertId field but cannot gurantee it. Only efficient way to remove those is at time of selecting the records. If you do it on table removing and altering table is quite heavy operation better not to use it. As per your streaming pipeline you are correctly inserting insertId. It shoule be in table row and no other place.Jack
Ah ok. So I guess I will just have to implement a window to deduplicate it a bit better. Thanks for the answer.Kamil Dziublinski

1 Answers

1
votes

You can't manually specify insertId for BigQuery streaming in Dataflow https://stackoverflow.com/a/54193825/1580227