I'm writing a Dataflow (Beam SDK 2.0.0) that reads from Pub/Sub, counts the elements in a window and then stores the counts in BigTable as a timeseries. The windows are fixed on durations of 1 minute.
My intention was to update the value of the current window every second using a trigger in order to get real-time updates on the current time window.
But that doesn't seem to work. The value gets correctly updated every second but once Dataflow starts working on the next minute the first one is updated to zero. So basically only my last value is correct, all the rest is zero.
Pipeline pipeline = Pipeline.create(options);
PCollection<String> live = pipeline
.apply("Read from PubSub", PubsubIO.readStrings()
.fromSubscription("projects/..."))
.apply("Window per minute",
Window
.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
);
I've tried playing with the trigger code but nothing helps. My only options right now is to remove the entire .trigger block. Has anyone experienced similar behaviour?