0
votes

I've created a Window in Apache BEAM to wait either a window to expire or the window is too full (messages are unbounded and coming from a pub/sub queue):

   .apply("Window", Window.<TraceUpdateMessage>into(new GlobalWindows())
                        .withTimestampCombiner(TimestampCombiner.EARLIEST)
                        .triggering(Repeatedly.forever(
                                AfterFirst.of(
                                        AfterProcessingTime
                                                .pastFirstElementInPane()
                                                .plusDelayOf(Duration.standardMinutes(options.getFixedWindowDuration())),
                                        AfterPane
                                                .elementCountAtLeast(options.getFixedWindowElementCountTrigger())
                                )
                        ))
                        .withAllowedLateness(Duration.ZERO)
                        .discardingFiredPanes()
                )

I've got a ParDo right after the window:

 .apply("Aggregate", ParDo.of(
                        new CustomAggregationFunction()
)

the problem is that the CustomAggregationFunction is getting called right away, and it doesn't look like the window is doing anything. The goal is go have the CustomAggregationFunction get called only when a Window is full and process all elements inside that window.

1

1 Answers

1
votes

Windowing doesn't take effect until any grouping transform. Inserting grouping transform such as group by key or reshuffle between Window and ParDo should work.