2
votes

We have a Kafka stream aggregation topology. We need to keep the size of the changeLog topic in check to reduce the Kafka storage costs. So we use transformer (DSL API) in the topology to schedule a punctuation that deletes the old records from the stateStore using keyValueStore.delete().
I am able to verify that after a delete, on further scheduled triggers of the punctuation, the deleted key is not there in the state store. But does it remove the record from the changeLog topic as well? More importantly, does it reduce the size of the changeLog topic as well so that the Kafka storage cost is in check??

2

2 Answers

1
votes

Yes, changes to the state store are applied to the changelog topic.

1
votes

No, there's no actual record deletion into the changelog topic when you issue a "delete" command. Be aware that a "delete" command is in fact a record with a null value (aka tombstone) written into a topic (changelog or any other) - see here:

null values are interpreted in a special way: a record with a null value represents a "DELETE" or tombstone for the record's key

So, in fact the interpretation is the one that makes it feel like a deletion; one could read a changelog topic (you'll have to know the exact topic's name) as a KStream or by using the Kafka Consumer API and will find the tombstone records there (till removed by the compaction or retention thread). But if you read a changelog or any compacted topic with a KTable than a tombstone record will determine a deletion from the associated store - you'll no longer find the related key in the store despite the fact that it actually exists in the related compacted topic.

If compaction policy is enabled on a topic (by default is enabled on changelog topics) than its records are removed till the last one for a specific key. So at some point you'll only have the delete record because the previous records with the same key are removed by the compaction Kafka thread.