4
votes

We are using the following code to write the records to BigQuery:

BigQueryIO.writeTableRows()
    .to("table")
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
    .withSchema(schema);

With this code, when we do a backfill, some of the records get sent to this dataflow again, resulting in duplicates in the BigQuery table. Is there any way to configure an upsert operation based on the field name in the dataflow?

2
Could you tell more in your question about the backfill process? Maybe there's a way to avoid it and use a single streaming pipeline. - jkff
@Jkff We have a dataflow pipeline where events are streamed into Big query. Now, we often come across the scenarios that events between so and so time were not correct or some events were missing. In these cases, we re-play all the events (after correcting them in source) to pipeline. What we want to achieve is, dataflow should update the payload based on some id if a record already exists. - Darshan Mehta
Oh. If you're reloading the entire data, can you just use WRITE_TRUNCATE WriteDisposition, so the table will be overwritten? - jkff
It's not reloading all the data, it's just replaying some of the events. So, unfortunately WRITE_TRUNCATE would not help. - Darshan Mehta
I see. As a workaround, I'd suggest loading the data into a temporary table and, after the pipeline finishes, running a Dremel DML query cloud.google.com/bigquery/docs/reference/standard-sql/… to upsert it into the original table. Meanwhile feel free to file a JIRA to support DML in BigQueryIO natively. - jkff

2 Answers

6
votes

I had a very similar use case and we decided to solve this issue by creating a view on top of the table which deduplicates the data, and point anything that was accessing the original table to now query from this view instead.

BigQuery deduplication and partitioned table is a very good reference. As is https://wecode.wepay.com/posts/bigquery-wepay

Essentially you need to make sure you have some sort of last_updated column, as well as an id column which uniquely identifies the row. Then you can create a view that gets all the data for the most recent version of each id. There may be some small performance loss because you are querying a view, but we found it to be negligible.

1
votes

Is this still a gap with BigQueryIO writer... seems like a major limitation,most workloads will load more data each night, but on any given night may need to re-run that set. Do not ever want to truncate entire table, but perhaps just truncate that run/set of keys is fine to then append vs an actual update. So ideally there is some TRUNCATE_KEY_SET or normal UPDATE....