1
votes

I want to use the Kafka Streams Processor API and generate some messages every minute in a scheduled punctuator function. Can Kafka Streams guarantee that these messages get written to the output topic exactly once?

I understand that exactly-once processing is possible in Kafka Streams because it makes a single transaction out of the following operations:

  1. Commit offset to an input topic
  2. Write result to an output topic

Does this concept extend to punctuator functions in the processor API, for which there is no associated input message needing a commit?

For example, this punctuator function iterates over items in a key value state store. Each item is deleted from the store and forwarded downstream:

override def punctuate(timestamp: Long) : Unit =
  store.all.asScala.foreach { keyValue =>
      store.delete(keyValue.key)
      context.forward(keyValue.key, keyValue.value)
  }

Each message in the store should appear on the output topic exactly once, even in the case of processor failure and restart.

Assume the store is persistent; it is backed by a kafka changelog topic. The punctuator is scheduled every minute wall clock time. I have configured processing.guarantee=exactly_once in my config.

1

1 Answers

2
votes

Exactly once semantic also applies if you are using Punctuator.

Under the hood using the state store is all about to writing to changelog topic (even deleting - writing message with some key and null value)

In your use case Kafka Streams will read messages from some input topic and write to output topic and to some changelog topic (operation on state store).

If you enable exactly once in Kafka Streams, it will work in transaction mode. Using transaction - Atomic multi-partition write - Kafka Streams insure, that when offset commit was performed, results were written to output topic and state store was also flashed to changelog topic on the brokers. Above operations are Atomic, so if one of them will failed, application will reprocess messages from previous offset position. All above will works because Processor::process and Punctuator::punctuate(...) are executed in a single thread for particular partition.

More details can be found: