I am performing an aggregation on a windowed stream and want to suppress early aggregation results. By early results I mean results computed before the window ends, but not those results occurring during the grace period. Thus, I would like to suppress all aggregation results with timestamp < window end, but forward all records with timestamp >= window end and timestamp < window close.
Minimal Kafka Streams topology example:
new StreamsBuilder()
.stream("my-topic")
.windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
.reduce(myReducer)
.suppress( /* searched for*/ )
.toStream();
Thus, Suppressed.untilWindowCloses( .. ) is not an option for me as I would have to wait until the grace period to be expired, which might be long.
According to KIP-328, exactly the required behavior could be obtained using Suppressed.untilTimeLimit(Duration.ZERO, .. ) as (quoted from the KIP's description):
a. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.
However Kafka Streams JavaDoc as well as the corresponding implementation imply that this is not the case and the time limit starts its countdown on receiving the first record per (windowed-)key and not when the window ends.
I would be glad about clarification on this and support on how to achieve the desired behavior.