1
votes

The kafka stream processing is implemented in our system for transaction processing. The solution is implemented as below,

Kafka producer publishes event to kafka topic and the stream processor process the input event and perform the aggregation operation. After the stream processing, the event will be published to another topic. Since no consumer is implemented in first topic, how can i remove the processed message from first topic.

3

3 Answers

4
votes

There is no way to remove messages from kafka manually (w/o hacking data on the disk, AFAIK). You have only 3 options:

  • use time based retention policy (for example let kafka remove all messages older than 1 hour automatically)

  • use storage based retention policy (let kafka keep the topic size to be some predefined value)

  • use topic compaction policy - let kafka keep the newest version of your key. All older versions of the key will be removed (compacted).

As already described by Luciano Afranllie - you don't need to remove messages manually. You can process messages and let kafka manage the topic according your policy.

2
votes

Take into account that your stream processing chain is a consumer of the first topic. If you need to re-process the original data for some reason (for example, if you realize that there is a bug in your stream processing logic), you may want to have the original messages available on the first topic even after processing them.

So, you don't need to remove the messages, you have to set a retention policy on that topic that suit your needs. The trade off is usually the amount of time that the data is available vs. the amount of storage that is required.

0
votes

There is a Kafka Improvement Proposal (KIP) to add exactly this functionality for this use case.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient

At the moment all the Scala code to do message deletion is in 0.11 Kafka and has been tested to work

https://github.com/apache/kafka/pull/2476

However the addition of this functionality in the Java AdminClient API and the documentation is not yet complete.