3
votes

I have the following piece of code to aggregate data hourly based on event time

KStream<Windowed<String>, SomeUserDefinedClass> windowedResults = inputStream
.groupByKey(Grouped.with(Serdes.String(), new SomeUserDefinedSerde<>()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ofMinutes(15)))
.aggregate
(
    // do some aggregation
)
.suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(75), Suppressed.BufferConfig.unbounded()))
.toStream();

The issue is that I am unable to close the time window and emit the results if I don't receive data with the same key and a timestamp later than the time limit + grace period.

I would like to know what are the alternatives I can use to ensure the window is closed and data is emitted once a given time has passed (without waiting for any new data for the same key).

Is there an option/feature to make the untilTimeLimit parameter based on real time, and not the event time?

Note: This question is not about why a TimeWindow is not closed, but how to close it in the absence of new data

it's not the same question. this question is to ask for alternatives to solve the issue (and not why it is happening). the same keys does not always arrive every hour, so I need a mechanism to emit the results by a certain time instead of waiting a very long time (or forever)E Yeoh
You can use Processor API. Using Punctuator you can schedule periodical job to perform some work, ex. passing aggregation result forward - kafka.apache.org/21/documentation/streams/developer-guide/…Bartosz Wardziński
You don't need a new record per key, but only per partition. Does this help? If there is not data at all, you could maybe use a "dummy-keys" (using keys that do not appear in your data) and make sure to send one record per partition to advance time.Matthias J. Sax