1
votes

I have a few Kafka consumers implemented in Java and I am implementing a standalone application to check records and tombstone them. The hope is that Kafka will delete state stores as it compacts topics.

Now... I am getting a bit confused on the different types of stores that are created by Kafka. For each type of store, I would like to know:

  • Is it deleted when Kafka deletes old records in the corresponding topic?
  • Is it deleted when you tombstone records in the corresponding topic?
  • Are we stuck with it?

The types of stores that I see are the following:

  1. KSTREAM-AGGREGATE-STATE-STORE changelog
  2. KSTREAM-AGGREGATE-STATE-STORE repartition
  3. (KTABLE) STATE-STORE changelog
  4. KSTREAM-KEY-SELECT repartition

For the stream topology using an Aggregate function, we already have a tombstone strategy which should cover stores of type #1. We send a null message to the stream application & it returns it as the aggregate result.

For stores of type #3, I will be running a tombstoning application on the corresponding ktable. I expect the changelog to shrink.

However, for stores of type #2 and #4, I have no idea how they are cleaned up. They corresponding to selectKey()+leftJoin() functions in my consumer's topology. However, they are tied to a kstream-centric topology so I don't know what to do to have them cleaned up. Any suggestions that don't involve stopping the broker?

1
We send a null message to the stream application & it returns it as the aggregate result. -- I would assume this does not work because null-value records are not processed by a KStream aggregation operator -- you would need to have a non-null "surrogate delete value" and let the aggregation function return null if it sees this "surrogate delete value".Matthias J. Sax

1 Answers

0
votes

#2 and #4 are not stores - they are internal topics created by Kafka Streams.

There is no way to explicitly cleanup these repartition topics but Kafka would automatically cleanup these based on the kafka server's retention.ms config (which is default of 7 days if not specified explicitly) as these topics (#2 and #4) are created by Kafka Streams with cleanup.policy = delete.