0
votes

Kafka version 1.1

We use Kafka KStream to aggregate events base on a selected key in the event itself. Below roughly what it does


KStream[String, Event]
.selectKey[String]({ (_, event) =>
        aggregationKey(event)
      }
.groupByKey()
.aggregate(
        () => {
          Event("", "")
        },
        (k: Any, event: Event, aggregate: Event) => aggregator(k, event, aggregate, inputTopicName),
        Materialized
          .as[String, Event, KeyValueStore[Bytes, Array[Byte]]]("store-name")
          .withValueSerde(protoSerde)
      )
.toStream
.to(outTopicName)

inside the "aggregator" function I return null base on a certain condition in order to produce a tombstone event.

Kafka creates two topics, repartition and changelog. the retention is set to -1 in repartition topic. These topics keep growing regardless of the tombstone event. I can not find a way to clean them up.

Our requirement is straight forward:

As long as the condition is met for a key, there is no use for the aggregation changelog of that key. We'd like to purge all events of that key completely and permanently

Please advice on how to clean up the kstream internal topics base on key? Much appreciated.

1

1 Answers

0
votes

The upstream repartition topic should not grow unbounded: as you notice the retention time is set to -1 (to avoid data loss). However, the topic is purged by KafkaStreams explicitly after a record is processed.

Also, if you return null from your Aggregator the corresponding entry will be deleted in the KTable store and a tombstone will be sent to the changelog topic and sent downstream. Of course, tombstone are also first appended to the topic, and only when broker side topic compaction runs, old records are "garbage collected".