4
votes

I'm trying to build a Streaming Dataflow Job which read events from Pub/Sub and write them into BigQuery.

According to the documentation, Dataflow can detect duplicate messages delivery if a Record ID is used (see: https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids)

But even using this Record ID, I still have some duplicates (around 0.0002%).

Did I miss something ?

EDIT:

I use Spotify Async PubSub Client to publish messages with the following snipplet:

Message
      .builder()
      .data(new String(Base64.encodeBase64(json.getBytes())))
      .attributes("myid", id, "mytimestamp", timestamp.toString)
      .build()

Then I use Spotify scio to read the message from pub/sub and save it to DataFlow:

val input = sc.withName("ReadFromSubscription")
              .pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
    .withName("FixedWindow")
    .withFixedWindows(windowSize)  // apply windowing logic
    .toWindowed  // convert to WindowedSCollection
    //
    .withName("ParseJson")
    .map { wv =>
      wv.copy(value = TableRow(
        "message_id" -> (Json.parse(wv.value) \ "id").as[String],
        "message" -> wv.value)
      )
    }
    //
    .toSCollection  // convert back to normal SCollection
    //
    .withName("SaveToBigQuery")
    .saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)

The Window size is 1 minute.

After only few seconds injecting messages I already have duplicates in BigQuery.

I use this query to count duplicates:

SELECT 
   COUNT(message_id) AS TOTAL, 
   COUNT(DISTINCT message_id) AS DISTINCT_TOTAL 
FROM my_dataset.my_table

//returning 273666  273564

And this one to look at them:

SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
  SELECT message_id
  FROM my_dataset.my_table
  GROUP BY message_id
  HAVING COUNT(*) > 1
) ORDER BY message_id

//returning for instance:
row|id                                    | processed_at           | processed_at_epoch    
1   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410367   { ...json1... }  
2   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410368   { ...json1... }  
3   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973907   { ...json2... }  
4   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973741   { ...json2... } 
5   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:09:10 UTC 1486044550489   { ...json3... } 
6   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:08:52 UTC 1486044532680   { ...json3... }
1
Can you elaborate on how you're using record IDs and measuring duplicates? Note from the documentation that "Dataflow does not perform this de-duplication for messages with the same record ID value that are published to Pub/Sub more than 10 minutes apart." Could that be causing your observed duplicates?Ben Chambers
I added more info :)Vincent Spiewak

1 Answers

2
votes

The BigQuery documentation states that there may be rare cases where duplicates arrive:

  1. "BigQuery remembers this ID for at least one minute" -- if Dataflow takes more than one minute before retrying the insert BigQuery may allow the duplicate in. You may be able to look at the logs from the pipeline to determine if this is the case.
  2. "In the rare instance of a Google datacenter losing connectivity unexpectedly, automatic deduplication may not be possible."

You may want to try the instructions for manually removing duplicates. This will also allow you to see the insertID that was used with each row to determine if the problem was on the Dataflow side (generating different insertIDs for the same record) or on the BigQuery side (failing to deduplicate rows based on their insertID).