3
votes

I have an issue with KStreams aggregation and windows. I want to aggregate a record into a list of records which have the same key as long as it falls inside a time window. I have chosen SessionWindows because I have to work with a moving window inside a session: let's say record A arrives at 10:00:00; then every other record with the same key that arrives inside the 10 second window time (until 10:00:10) will fall into the same session, bearing in mind that if it arrives at 10:00:03, the window will move until 10:00:13 (+10s).

That leads us to have a moving window of +10s from the last record received for a given key.

Now the problem: I want to obtain the last aggregated result. I have used .suppress() to indicate that I don't want any intermediate results, I just want the last one when the window closes. This is not working fine because while it doesn't send any intermediate aggregated result, when the time window ends, I don't get any result. I have noted that in order to receive it I need to publish another message into the topic, something which is in my case impossible.

Reading about .suppress() I have come to the conclusion that it may not be the way to achieve what I want, that's why my question is: how can I force the window to close and send the latest aggregated calculated result?

@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES) 
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) { 
    input.map(this::getRecord)
            .groupBy(keyOfElement)
            .windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
            .aggregate(...do stuff...)
            .suppress(Suppressed.untilWindowCloses(unbounded()))
            .toStream()
            .map(this::createAggregatedResult);
}
1

1 Answers

3
votes

In short, the reason why this happens is because in KStreams, and most other stream processing engines that compute aggregations, time works based on event time.

https://kafka.apache.org/0101/documentation/streams#streams_time

In other words the window cannot close until a new message arrives beyond your time window + grace time that accounts for late arriving messages.

Moreover, based on some unit tests I’ve been writing recently I’m inclined to believe that the second message needs to land in the same partition as the previous message for event time to move forward. In practice, when you run in production and presumably process hundreds of messages per second this becomes unnoticeable.

Let me also add that you can implement custom timestamp extractor which allows you fine-grained control in terms of which time window a particular message lands in.

how can I force the window to close and send the latest aggregated calculated result?

To finally answer your question, it’s not possible to force the time window to close without emitting an extra message to the source topic.