2
votes

I'm using Apache Kafka streaming to do aggregation on data consumed from a Kafka topic. The aggregation is then serialized to another topic, itself consumed and results stored in a DB. Pretty classic use-case I suppose.

The result of the aggregate call is creating a KTable backed up by a Kafka changelog "topic".

This is more complex than that in practice, but let's say it is storing the count and sum of events for a given key (to compute average):

 KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...)

That changelog "topic" does not seem to have a retention period set (I don't see it "expires" on the contrary of the other topics per my global retention setting).

This is actually good/necessary because this avoids losing my aggregation state when a future event comes with the same key.

However on the long run this means this changelog will grow forever (as more keys get in)? And I do potentially have a lot of keys (and my aggregation are not as small as count/sum).

As I have a means to know that I won't get anymore events of a particular key (some events are marked as "final"), is there a way for me to strip the aggregation states for these particular keys of the changelog to avoid having it grows forever as I won't need them anymore, possibly with a slight delay "just" in case?

Or maybe there is a way to do this entirely differently with Kafka streaming to avoid this "issue"?

1
I've just read about tombstone messages, key will a null message that might allow me to drop those. Still need to test. And still interested in what would be the correct pattern anyway. - Christophe
Yes: changelog topics are configured with log compaction and not with retention time. If you receive the "final" record, your aggregation can just return null as aggregation result. This will delete it from the local RocksDB store as well as the underlying changelog topic. - Matthias J. Sax
Thanks Matthias, I have tested and confirm all goes as expected with returning null when reaching the "final" record. - Christophe
Posted my comment as answer. - Matthias J. Sax

1 Answers

2
votes

Yes: changelog topics are configured with log compaction and not with retention time. If you receive the "final" record, your aggregation can just return null as aggregation result. This will delete it from the local RocksDB store as well as the underlying changelog topic.