Update:
If you use WriteToBigQuery
then it will automatically create and
insert a unique row id called insertId
for you, which will be inserted to bigquery. It's handled for you, you don't need to worry about it. :)
WriteToBigQuery
is a PTransform
, and in it's expand
method calls BigQueryWriteFn
BigQueryWriteFn
is a DoFn
, and in it's process
method calls _flush_batch
_flush_batch
is a method that then calls the BigQueryWrapper.insert_rows
method
BigQueryWrspper.insert_rows
creates a list of bigquery.TableDataInsertAllRequest.RowsValueListEntry
objects which contain the insertId
and the row data as a json object
- The
insertId
is generated by calling the unique_row_id
method which returns a value consisting of UUID4 concatenated with _
and with an auto-incremented number.
In the current 2.7.0 code, there is this happy comment; I've also verified it is true :)
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1182
# Prepare rows for insertion. Of special note is the row ID that we add to
# each row in order to help BigQuery avoid inserting a row multiple times.
# BigQuery will do a best-effort if unique IDs are provided. This situation
# can happen during retries on failures.
* Don't use BigQuerySink
At least, not in it's current form as it doesn't support streaming. I guess that might change.
Original (non)answer
Great question, I also looked and couldn't find a certain answer.
Apache Beam doesn't appear to use that google.cloud.bigquery client sdk you've linked to, it has some internal generated api client, but it appears to be up-to-date.
I looked at the source:
The insertall
method is there https://github.com/apache/beam/blob/18d2168ee71a1b1b04976717f0f955199bb00961/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py#L476
I also found the insertid
mentioned
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py#L1707
So if you can make an InsertAll call it will use a TableDataInsertAllRequest
and pass a RowsValueListEntry
class TableDataInsertAllRequest(_messages.Message):
"""A TableDataInsertAllRequest object.
Messages:
RowsValueListEntry: A RowsValueListEntry object.
The RowsValueListEntry
message is where the insertid is.
Here's the API docs for insert all
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
I will look some more at this because I don't see the WriteToBigQuery()
exposing this.
I suspect that the 'bigquery will remember this for at least one minute` is a pretty loose guarantee for de-duping. The docs suggest using datastore if you need transactions. Otherwise you might need to run SQL with window functions to de-dupe at runtime, or run some other de-duping jobs on bigquery.
Perhaps using batch_size
parameter of WriteToBigQuery()
, and running a combine (or at worst a GroupByKey) step in dataflow is a more stable way to de-dupe prior to writing.