I have a few Kafka consumers implemented in Java and I am implementing a standalone application to check records and tombstone them. The hope is that Kafka will delete state stores as it compacts topics.
Now... I am getting a bit confused on the different types of stores that are created by Kafka. For each type of store, I would like to know:
- Is it deleted when Kafka deletes old records in the corresponding topic?
- Is it deleted when you tombstone records in the corresponding topic?
- Are we stuck with it?
The types of stores that I see are the following:
- KSTREAM-AGGREGATE-STATE-STORE changelog
- KSTREAM-AGGREGATE-STATE-STORE repartition
- (KTABLE) STATE-STORE changelog
- KSTREAM-KEY-SELECT repartition
For the stream topology using an Aggregate function, we already have a tombstone strategy which should cover stores of type #1. We send a null message to the stream application & it returns it as the aggregate result.
For stores of type #3, I will be running a tombstoning application on the corresponding ktable. I expect the changelog to shrink.
However, for stores of type #2 and #4, I have no idea how they are cleaned up. They corresponding to selectKey()+leftJoin() functions in my consumer's topology. However, they are tied to a kstream-centric topology so I don't know what to do to have them cleaned up. Any suggestions that don't involve stopping the broker?
We send a null message to the stream application & it returns it as the aggregate result.
-- I would assume this does not work becausenull
-value records are not processed by aKStream
aggregation operator -- you would need to have a non-null "surrogate delete value" and let the aggregation function returnnull
if it sees this "surrogate delete value". – Matthias J. Sax