I've created a Window in Apache BEAM to wait either a window to expire or the window is too full (messages are unbounded and coming from a pub/sub queue):
.apply("Window", Window.<TraceUpdateMessage>into(new GlobalWindows())
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.triggering(Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(options.getFixedWindowDuration())),
AfterPane
.elementCountAtLeast(options.getFixedWindowElementCountTrigger())
)
))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
I've got a ParDo right after the window:
.apply("Aggregate", ParDo.of(
new CustomAggregationFunction()
)
the problem is that the CustomAggregationFunction
is getting called right away, and it doesn't look like the window is doing anything. The goal is go have the CustomAggregationFunction
get called only when a Window is full and process all elements inside that window.