1
votes

I'm writing a Dataflow (Beam SDK 2.0.0) that reads from Pub/Sub, counts the elements in a window and then stores the counts in BigTable as a timeseries. The windows are fixed on durations of 1 minute.

My intention was to update the value of the current window every second using a trigger in order to get real-time updates on the current time window.

But that doesn't seem to work. The value gets correctly updated every second but once Dataflow starts working on the next minute the first one is updated to zero. So basically only my last value is correct, all the rest is zero.

Pipeline pipeline = Pipeline.create(options);

PCollection<String> live = pipeline
        .apply("Read from PubSub", PubsubIO.readStrings()
        .fromSubscription("projects/..."))
        .apply("Window per minute",
            Window
                .<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(Repeatedly
                    .forever(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(1)))                                         
                    .orFinally(AfterWatermark.pastEndOfWindow()))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.ZERO)
            );

I've tried playing with the trigger code but nothing helps. My only options right now is to remove the entire .trigger block. Has anyone experienced similar behaviour?

1

1 Answers

1
votes

After reporting my issue to Google they discovered some issues in the Beam SDK which are causing this. More details on these links:

When EOW and GC timers fire together (nonzero allowed lateness) we fail to note that it is the final pane: https://issues.apache.org/jira/browse/BEAM-2505

Processing time timers are not ignored properly if they come in with the GC timer: https://issues.apache.org/jira/browse/BEAM-2502

Processing time timers are just interpreted as GC timers, completely incorrectly comparing timestamps from different time domains: https://issues.apache.org/jira/browse/BEAM-2504