Kafka version 1.1
We use Kafka KStream to aggregate events base on a selected key in the event itself. Below roughly what it does
KStream[String, Event]
.selectKey[String]({ (_, event) =>
aggregationKey(event)
}
.groupByKey()
.aggregate(
() => {
Event("", "")
},
(k: Any, event: Event, aggregate: Event) => aggregator(k, event, aggregate, inputTopicName),
Materialized
.as[String, Event, KeyValueStore[Bytes, Array[Byte]]]("store-name")
.withValueSerde(protoSerde)
)
.toStream
.to(outTopicName)
inside the "aggregator" function I return null base on a certain condition in order to produce a tombstone event.
Kafka creates two topics, repartition and changelog. the retention is set to -1 in repartition topic. These topics keep growing regardless of the tombstone event. I can not find a way to clean them up.
Our requirement is straight forward:
As long as the condition is met for a key, there is no use for the aggregation changelog of that key. We'd like to purge all events of that key completely and permanently
Please advice on how to clean up the kstream internal topics base on key? Much appreciated.