1
votes

We are using a pretty simple flow where messages are retrieved from PubSub, their JSON content is being flatten into two types (for BigQuery and Postgres) and then inserted into both sinks. But, we are seeing duplicates in both sinks (Postgres was kinda fixed with a unique constraint and a "ON CONFLICT... DO NOTHING").

At first we trusted in the supposedly "insertId" UUId that the Apache Beam/BigQuery creates. Then we add a "unique_label" attribute to each message before queueing them into PubSub, using data from the JSON itself, which gives them uniqueness (a device_id + a reading's timestamp). And subscribed to the topic using that attribute with "withIdAttribute" method. Finally we paid for GCP Support, and their "solutions" do not work. They have told us to even use Reshuffle transform, which is deprecated by the way, and some windowing (that we do not won't since we want near-real time data).

This the main flow, pretty basic: [UPDATED WITH LAST CODE] Pipeline

        val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OptionArgs::class.java)
        val pipeline = Pipeline.create(options)
        var mappings = ""

        // Value only available at runtime
        if (options.schemaFile.isAccessible){
            mappings = readCloudFile(options.schemaFile.get())
        }

        val tableRowMapper = ReadingToTableRowMapper(mappings)
        val postgresMapper = ReadingToPostgresMapper(mappings)

        val pubsubMessages =
            pipeline
            .apply("ReadPubSubMessages",
                PubsubIO
                    .readMessagesWithAttributes()
                    .withIdAttribute("id_label")
                    .fromTopic(options.pubSubInput))

        pubsubMessages
            .apply("AckPubSubMessages", ParDo.of(object: DoFn<PubsubMessage, String>() {
                @ProcessElement
                fun processElement(context: ProcessContext) {
                    LOG.info("Processing readings: " + context.element().attributeMap["id_label"])
                    context.output("")
                }
            }))

        val disarmedMessages =
            pubsubMessages
                .apply("DisarmedPubSubMessages",
                    DisarmPubsubMessage(tableRowMapper, postgresMapper)
                )

        disarmedMessages
            .get(TupleTags.readingErrorTag)
            .apply("LogDisarmedErrors", ParDo.of(object: DoFn<String, String>() {
                @ProcessElement
                fun processElement(context: ProcessContext) {
                    LOG.info(context.element())
                    context.output("")
                }
            }))

        disarmedMessages
            .get(TupleTags.tableRowTag)
            .apply("WriteToBigQuery",
                BigQueryIO
                    .writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
                    .to(options.bigQueryOutput)
            )

        pipeline.run()

DissarmPubsubMessage is a PTransforms that uses FlatMapElements transform to get TableRow and ReadingsInputFlatten (own class for Postgres)

We expect zero duplicates or the "best effort" (and we append some cleaning cron job), we paid for these products to run statistics and bigdata analysis...

[UPDATE 1] I even append a new simple transform that logs our unique attribute through a ParDo which supposedly should ack the PubsubMessage, but this is not the case:

new flow with AckPubSubMessages step

Thanks!!

2
Is that the final code? I don't see withIdAttribute in it. How far apart can the duplicates arrive and which percentage of duplicate messages do you see?Guillem Xercavins
@GuillemXercavins Sorry, i just updated the post with the most recent code (with or without withIdAttribute, the result is the same). From the last run of the pipeline (we clean the table on each run), 43,954 rows were inserted and around 1,148 have duplicates (in some cases triplicates).julodiaz
@GuillemXercavins could it be that i'm using Apache Beam's PubsubMessage implementation and not the one from Google? 'Cause the one from Google's has the messageId property, but if i change to that one PubsubIO wont work...julodiaz
Is it just me or does AckPubSubMessages not actually ack the messages? Have you tried specifically calling .ack()?Justin

2 Answers

0
votes

Looks like you are using the global window. One technique would be to window this into an N minute window. Then process the keys in the window and drop an items with dup keys.

0
votes

The supported programming languages are Python and Java, your code seems to be Scala and as far as I know it is not supported. I strongly recommend using Java to avoid any unsupported feature for the programming language you use.

In addition, I would recommend the following approaches to work on duplicates, the option 2 could meet your need of near-real-time:

  1. message_id. Probably you already read the FAQ - duplicates which points to deprecated doc. However, if you check the PubsubMessage object you will notice that messageId is still available and it will be populated if not set by the publisher:

"ID of this message, assigned by the server when the message is published ... It must not be populated by the publisher in a topics.publish call"

  1. BigQuery Streaming. To validate duplicate during loading the data, right before inserting in BQ you can create UUID.Please refer the section Example sink: Google BigQuery.

  2. Try the Dataflow template PubSubToBigQuery and validate there are not duplicates in BQ.