3
votes

According to BigQuery docs, you can ensure data consistency providing an insertId (https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency). If it's not provided, BQ will try to ensure consistency based on internals Ids and best-effort.

Using the BQ API you can do that with the row_ids param (https://google-cloud-python.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html#google.cloud.bigquery.client.Client.insert_rows_json) but I can't find the same for the Apache Beam Python SDK.

Looking into the SDK I have noticed that a 'unique_row_id' property exist, but I really don't know how to pass my param to WriteToBigQuery()

How can I write into BQ (streaming) providing a row Id for deduplication?

1

1 Answers

1
votes

Update:

If you use WriteToBigQuery then it will automatically create and insert a unique row id called insertId for you, which will be inserted to bigquery. It's handled for you, you don't need to worry about it. :)

  1. WriteToBigQuery is a PTransform, and in it's expand method calls BigQueryWriteFn
  2. BigQueryWriteFn is a DoFn, and in it's process method calls _flush_batch
  3. _flush_batch is a method that then calls the BigQueryWrapper.insert_rows method
  4. BigQueryWrspper.insert_rows creates a list of bigquery.TableDataInsertAllRequest.RowsValueListEntry objects which contain the insertId and the row data as a json object
  5. The insertId is generated by calling the unique_row_id method which returns a value consisting of UUID4 concatenated with _ and with an auto-incremented number.

In the current 2.7.0 code, there is this happy comment; I've also verified it is true :) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1182

# Prepare rows for insertion. Of special note is the row ID that we add to
# each row in order to help BigQuery avoid inserting a row multiple times.
# BigQuery will do a best-effort if unique IDs are provided. This situation
# can happen during retries on failures.

* Don't use BigQuerySink

At least, not in it's current form as it doesn't support streaming. I guess that might change.


Original (non)answer

Great question, I also looked and couldn't find a certain answer.

Apache Beam doesn't appear to use that google.cloud.bigquery client sdk you've linked to, it has some internal generated api client, but it appears to be up-to-date.

I looked at the source: The insertall method is there https://github.com/apache/beam/blob/18d2168ee71a1b1b04976717f0f955199bb00961/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py#L476

I also found the insertid mentioned https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py#L1707

So if you can make an InsertAll call it will use a TableDataInsertAllRequest and pass a RowsValueListEntry

class TableDataInsertAllRequest(_messages.Message):
  """A TableDataInsertAllRequest object.
  Messages:
    RowsValueListEntry: A RowsValueListEntry object.

The RowsValueListEntry message is where the insertid is.

Here's the API docs for insert all https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

I will look some more at this because I don't see the WriteToBigQuery() exposing this.

I suspect that the 'bigquery will remember this for at least one minute` is a pretty loose guarantee for de-duping. The docs suggest using datastore if you need transactions. Otherwise you might need to run SQL with window functions to de-dupe at runtime, or run some other de-duping jobs on bigquery.

Perhaps using batch_size parameter of WriteToBigQuery(), and running a combine (or at worst a GroupByKey) step in dataflow is a more stable way to de-dupe prior to writing.