6
votes

We are using Kafka stream's SessionWindows to aggregate arrival of related events. Also along with the aggregation we are specifying the retention time for the window using until() API. Stream info:
The session window (inactivity time) is 1 minute and the retention time passed to until() is 2 minutes. We are using customized TimestampExtractor to map event's time.

Example:
Event: e1; eventTime: 10:00:00 am; arrivalTime:2pm(same day)
Event: e2; eventTime: 10:00:30 am; arrivalTime 2:10 pm (same day)
The arrival time for the second event is 10 minutes after the arrival of e1 which exceeds retention time + inactivity time. But older event e1 is still part of the aggregation despite the retention time being 2 mins.

Questions:
1) How does kafka streams clean up state store using until() API? Since the retention value specified as an argument is "lower bound for how long a window will be maintained." When exactly the window is purged?

2) Is there a background thread that cleans up the state store periodically? If yes, then is there a way to identify the actual time when the window is purged.

3) Any stream configuration that would purge the data for a window after retention time.

1

1 Answers

9
votes

Before I answer your concrete question: Note, that retention time is not based on system time, but on "stream time". "Stream time" is an internally tracked time progress based on whatever TimestampExtractor returns. Without going into too much detail: for your example with 2 records, "stream time" will be advance by 30 seconds when the second record arrives and thus retention time did not pass yet.

Also note that "stream time" is not advance if no new data arrives (for at least one partition). This holds for Kafka 0.11.0 and older but might change in future releases.

Update: The computation of stream-time was changed in Kafka 2.1 and stream-time may advance even if one partition does not deliver data. For details see KIP-353: Improve Kafka Streams Timestamp Synchronization

To your questions:

(1) Kafka Streams writes all store update into a changelog topic and a local RocksDB store. Both a divided into so-called segments with certain size. If new data arrives (ie, "stream time" progresses) new segments are created. If this happens, older segment are deleted iff all records in an old segment are older than retention time (ie, record timestamp smaller than "stream time" minus retention time).

(2) Thus, there is no background thread but cleanup is part of regular processing,

and (3) there is no configuration to force purging of older records/windows.

As whole segments are dropped if all record are expired, the older records within a segment (with most likely smaller/older timestamps) are maintained longer than retention time. The motivation behind this design is performance: expiring on a per-record basis would be too expensive.