1
votes

I have a kafka stream app that utilize a KTable. My app has ran a while so the KTable is already built up.

How do I truncate the KTable (assuming my app can handle rebuilding the table)?

Is stopping my app and also delete data from the changelog topic the correct way? It seems a little complicate I feel there should be a simpler way.

Thank you

1
What do you mean by "truncate" -- a KTable is a collection/set of key-value-pairs. What do you try to achieve? Please add more details to your question. - Matthias J. Sax
@MatthiasJ.Sax Basically I want to delete all existing key-value-pairs in that collection. But don't reset the kafka topic offset. So when the ktable come back it won't regenerate all existing values. But for new values coming to the changelog topic I still want to use them. - Suanmeiguo
I see. So you read the KTable directly from a topic (ie, builder.table()) or is it a result from an aggregation? Also, how do you decide, when you want to wipe out the data? - Matthias J. Sax
It is from an aggregation. But does it change how we do it (vs if it's a builder.table()). This is not a frequent operation. It's only for our dev and testing, or maybe if prod got screwed we want to clean it up one time. Thank you for your help. - Suanmeiguo
An example is: we might feed some invalid value to the input before, hence the aggregation generated invalid data in ktable. I want to refresh the ktable with newer correct value, but cannot just simply send all the keys again because some key no longer exists in my input topic. - Suanmeiguo

1 Answers

3
votes

If you want to re-create the KTable from scratch, you can use application reset tool. That could be one option for you.

Application reset tool will delete all the changelog and repartition topics created for an application ID and hence for new run, you will get the new KTable and state store.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool