11
votes

I have written this code in a Kafka stream application:

KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
    .aggregate(() -> {...})
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()...

which should (if i understood it correctly) emit records per Key after the window is closed. Somehow the behavior is the following:

The stream doesn't emit the first record and only forward it after the second record even with a different Key and then the second record is emitted only after the 3rd and so forth..

I have tried multiple StreamConfigs with "exactly_once" and with or without Caching also, this behavior persists.

Thanks in advance for your help !

2
if you want your data aggregated by period of time and not by "session", I guess you need to use TimeWindows instead of SessionWindows.Vasyl Sarzhynskyi
That did not work for me. Have a timed window, but it still does not complete the suppression effect on old windows until new events are added for the same key. Very frustrating AND counter-intuitive!BalRog

2 Answers

14
votes

That is expected behavior. Note, that suppress() is based on event-time. Thus, as long as no new data arrives, time cannot advance and thus evicting the record earlier would be wrong, because there is no guarantee, that the next record might belong to the current window.

-1
votes

I do not think "Session Window" with "suppress()" will generate any output.

Correct me if an wrong. As per my knowledge, suppress() works only with Time Based Windows & it does not work with Session Based Windows.