Can someone clarify what's the purpose for id_label
argument in ReafFromPubSub transform?
I'm using BigQuery sink, my understanding it acts like an insertId
for BQ Streaming API, Tabledata: insertAll
A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency.
However I don't see this expected behaviour.
I'm publishing messages to Pub/Sub, each message with same attribute
message_id
value (this is intentional to test pipeline / BQ dedupe behaviour)My pipeline reads from pubs as follows
beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id'
)
but still querying BQ, all messages get inserted. I expected, because each message published with same message_id value, BQ should have deduced those...
can someone clarify pls? Thanks in advance!
Also, I notice DirectRunner
keep throwing error when using this attribute,
NotImplementedError: DirectRunner: id_label is not supported for PubSub reads
I've to use DataflowRunner
... is that expected as well?
Cheers!
UPDATE 1 : moved to DataflowRunner, and the pipeline seems to respect id_label
argument during ReadFromPubSub(). However, the duplicate messages DO continue to get read into the pipeline sporadically.
My publisher application, every 15 sec, publishes messages in following format (the publisher app code is here):
cid=141&message_id=2&evt_time={{DATE_TIME_AT_RUNTIME}}
notice, i’m passing same message_id
value (=‘2’) in message’s attribute as well (this is intention to try, test deduce behaviour).
- my pipeline (running on Dataflow Runner, beam Python v2.11 SDK, pipeline code is here ), dumps following message to BQ. As you can see, multiple messages with same
message_id
get read into pipeline and emitted to sink. This usually happens, when I stop/restart my publisher application.
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z