1
votes

I am performing an aggregation on a windowed stream and want to suppress early aggregation results. By early results I mean results computed before the window ends, but not those results occurring during the grace period. Thus, I would like to suppress all aggregation results with timestamp < window end, but forward all records with timestamp >= window end and timestamp < window close.

Minimal Kafka Streams topology example:

new StreamsBuilder()
        .stream("my-topic")
        .windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
        .reduce(myReducer)
        .suppress( /* searched for*/ )
        .toStream();

Thus, Suppressed.untilWindowCloses( .. ) is not an option for me as I would have to wait until the grace period to be expired, which might be long.

According to KIP-328, exactly the required behavior could be obtained using Suppressed.untilTimeLimit(Duration.ZERO, .. ) as (quoted from the KIP's description):

a. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.

However Kafka Streams JavaDoc as well as the corresponding implementation imply that this is not the case and the time limit starts its countdown on receiving the first record per (windowed-)key and not when the window ends.

I would be glad about clarification on this and support on how to achieve the desired behavior.

1

1 Answers

2
votes

The KIP description is incorrect (I updated the wiki page accordingly). Note that further down the KIP says:

Rate-limited updates

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.

Hence, using untilTimeLimit is used to emit in a regular interval. For a windowed-aggregation the interval timer would start at window start-time -- you could still set the wait period to "window size" to not get any "early" updates, but you won't see every update after window-end passed but only see updates in "window size intervals". If you grace period is really long, this might still be good enough?

The use case you describe is currently not supported, but I think it's a quite interesting and useful one. Maybe you can create a feature request ticket?