3
votes

I have a project that has an apache beam pipeline with dependencies set up in such a way that I have to use version 0.20.0-beta of PubSub. This pipeline runs all the time (is unbounded).

[+] The problem: The PubSub message is being repeated every 30 minutes or so.

[+] What I've tried: I've read many solutions that mention how the Dataflow runner has checkpoints in which the acknowledgement happens. I've also read that using a PTransform such as GroupByKey would Ack those messages faster. So I've tried windowing, triggering and grouping by key but I still get the message repeated from PubSub.

[+] Question(s): What am I doing wrong? Why is the message not being Acked? (If I'm understanding correctly, it won't get acked until the end of the pipeline executes?? but my pipeline takes a loooong time, how to ack early?)

Is this a 'version' bug specific to 0.20.0-beta or should I be able to use PubsubIO.Reader with windowing and triggering in order to ack earlier?

[+] Code:

the window time is 10 seconds, the PubSub ack deadline is 60 seconds.

     .apply("Listen_To_PubSub", PubsubIO.readStrings().fromSubscription(subscription))
            .apply("Windowing", Window.<String> into(window).triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeLimit)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
            .apply("DeleteFromBQ", ParDo.of(new DeleteFromBQ()))
            .apply("Mapping", ParDo.of(new Mapping()))
            .apply("GroupByKey", GroupByKey.<String,String>create())
            .apply("Acknowledge", ParDo.of(new Grouped()))
            .apply("DoSomething1", ParDo.of(new DoSomething1()))
            .apply("Flatten_Iterable", Flatten.iterables())
            .apply("DoSomething2", ParDo.of(new DoSomething2()))
            .apply("DoSomething3", ParDo.of(new DoSomething3()))
            .apply("DoSomething4", ParDo.of(new DoSomething4()))
            .apply("Write_To_BigQuery", BigQueryIO.writeTableRows()
                    .to(output)
                    .withSchema(schema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );

Thanks in advance! Any input is appreciated.

2

2 Answers

1
votes

Seems like as you apply so many transforms, you exceed the ack deadline of 60 seconds. To see how long it takes I recommend to use Logging Pipeline Messages. I think you may need to move the Acknowledge sooner.

Another thing you can do is to use higher machine type to have the messages processed faster.

1
votes

So, I ended up resolving this issue by having to split my pipeline in 2.

The first half only listens to pubsubmessages | gets relevant info | writes to another pubsub topic.

The second half listens to those messages then uses the info in those messages for the rest of the pipeline.

This split not only took care of acking the messages but also allowed parallelism to work much better!