We have a Kafka stream aggregation topology. We need to keep the size of the changeLog topic in check to reduce the Kafka storage costs. So we use transformer (DSL API) in the topology to schedule a punctuation that deletes the old records from the stateStore using keyValueStore.delete().
I am able to verify that after a delete, on further scheduled triggers of the punctuation, the deleted key is not there in the state store. But does it remove the record from the changeLog topic as well? More importantly, does it reduce the size of the changeLog topic as well so that the Kafka storage cost is in check??
2 Answers
No, there's no actual record deletion into the changelog
topic when you issue a "delete" command. Be aware that a "delete" command is in fact a record with a null
value (aka tombstone
) written into a topic (changelog
or any other) - see here:
null values are interpreted in a special way: a record with a null value represents a "DELETE" or tombstone for the record's key
So, in fact the interpretation is the one that makes it feel like a deletion; one could read a changelog
topic (you'll have to know the exact topic's name) as a KStream or by using the Kafka Consumer API and will find the tombstone
records there (till removed by the compaction or retention thread). But if you read a changelog
or any compacted topic with a KTable than a tombstone
record will determine a deletion from the associated store - you'll no longer find the related key in the store despite the fact that it actually exists in the related compacted topic.
If compaction policy is enabled on a topic (by default is enabled on changelog
topics) than its records are removed till the last one for a specific key. So at some point you'll only have the delete record because the previous records with the same key are removed by the compaction Kafka thread.