I have an issue with KStreams aggregation and windows. I want to aggregate a record into a list of records which have the same key as long as it falls inside a time window. I have chosen SessionWindows because I have to work with a moving window inside a session: let's say record A arrives at 10:00:00; then every other record with the same key that arrives inside the 10 second window time (until 10:00:10) will fall into the same session, bearing in mind that if it arrives at 10:00:03, the window will move until 10:00:13 (+10s).
That leads us to have a moving window of +10s from the last record received for a given key.
Now the problem: I want to obtain the last aggregated result. I have used .suppress() to indicate that I don't want any intermediate results, I just want the last one when the window closes. This is not working fine because while it doesn't send any intermediate aggregated result, when the time window ends, I don't get any result. I have noted that in order to receive it I need to publish another message into the topic, something which is in my case impossible.
Reading about .suppress() I have come to the conclusion that it may not be the way to achieve what I want, that's why my question is: how can I force the window to close and send the latest aggregated calculated result?
@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES)
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) {
input.map(this::getRecord)
.groupBy(keyOfElement)
.windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
.aggregate(...do stuff...)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map(this::createAggregatedResult);
}