2
votes

I'm trying to achieve exactly-once delivery using Google Dataflow and PubSub using Apache Beam SDK 2.6.0.

Use case is quite simple:

'Generator' dataflow job sends 1M messages to PubSub topic.

GenerateSequence
          .from(0)
          .to(1000000)
          .withRate(100000, Duration.standardSeconds(1L));

'Archive' dataflow job reads messages from PubSub subscription and saves to Google Cloud Storage.

pipeline
        .apply("Read events",
            PubsubIO.readMessagesWithAttributes()
                // this is to achieve exactly-once delivery
                .withIdAttribute(ATTRIBUTE_ID)
                .fromSubscription('subscription')
                .withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
        .apply("Window events",
            Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                .withAllowedLateness(Duration.standardMinutes(15))
                .discardingFiredPanes())
        .apply("Events count metric", ParDo.of(new CountMessagesMetric()))
        .apply("Write files to archive",
            FileIO.<String, Dto>writeDynamic()
                .by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
                .to(archiveDir)
                .withTempDirectory(archiveDir)
                .withNumShards(options.getNumShards())
                .withNaming(dataSource ->
                    new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
                ));

I added 'withIdAttribute' to both Pubsub.IO.Write ('Generator' job) and PubsubIO.Read ('Archive' job) and expect that it will guarantee exactly-once semantics.

I would like to test the 'negative' scenario:

  1. 'Generator' dataflow job sends 1M messages to PubSub topic.
  2. 'Archive' dataflow job starts to work, but I stop it in the middle of processing clicking 'Stop job' -> 'Drain'. Some portion of messages has been processed and saved to Cloud Storage, let's say 400K messages.
  3. I start 'Archive' job again and do expect that it will take unprocessed messages (600K) and eventually I will see exactly 1M messages saved to Storage.

What I got in fact - all messages are delivered (at-least-once is achieved), but on top of that there are a lot of duplicates - something in the neighborhood of 30-50K per 1M messages.

Is there any solution to achieve exactly-once delivery?

2
Do you get duplicate messages even without interrupting the archive job?marcyb5st
No, I do not, 'happy path' works well. If 'archive' job is not interrupted I get exactly 1M messages in Storage.Dmitry Tkachev

2 Answers

2
votes

Dataflow does not enable you to persist state across runs. If you use Java you can update a running pipeline in a way that does not cause it to lose the existing state, allowing you to deduplicate across pipeline releases.

If this doesn't work for you, you may want to archive messages in a way where they are keyed by ATTRIBUTE_ID, e.g,. Spanner or GCS using this as the file name.

0
votes

So, I've never done it myself, but reasoning about your problem this is how I would approach it...

My solution is a bit convoluted, but I failed to identify others way to achieve this without involving other external services. So, here goes nothing.

You could have your pipeline reading both from pubsub and GCS and then combine them to de-duplicate the data. The tricky part here is that one would be a bounded pCollection (GCS) and the other an unbounded one (pubsub). You can add timestamps to the bounded collection and then window the data. During this stage you could potentially drop GCS data older than ~15 minutes (the duration of the window in your precedent implementation). These two steps (i.e. adding timestamps properly and dropping data that is probably old enough to not create duplicates) are by far the trickiest parts.

Once this has been solved append the two pCollections and then use a GroupByKey on an Id that is common for both sets of data. This will yield a PCollection<KV<Long, Iterable<YOUR_DATUM_TYPE>>. Then you can use an additional DoFn that drops all but the first element in the resulting Iterable and also removes the KV<> boxing. From there on you can simply continue processing the data as your normally would.

Finally, this additional work should be necessary only for the first pubsub window when restarting the pipeline. After that you should re-assign the GCS pCollection to an empty pCollection so the group by key doesn't do too much additional work.

Let me know what you think and if this could work. Also, if you decide to pursue this strategy, please post your mileage :).