0
votes

I am implementing Exactly-once semantics for a simple data pipeline, with Kafka as message broker. I can force Kafka producer to write each produced record exactly once by setting set enable.idempotence=true.

However, on the consumption front I need to guarantee that the consumer reads each record exactly once (I am not interested in storing the consumed records to external system or to another Kafka topic just processing). To achieve this, I have to ensure that polled records are processed and their offsets are committed to __consumer_offsets topic atomically/transactionally (both succeed/fail together).

In such case do I need to resort to Kafka transaction APIs to create a transactional producer in the consumer polling loop, where inside the transaction I perform: (1) processing of the consumed records and (2) committing their offsets, before closing the transaction. Would the normal commitSync/commitAsync serve in such case?

2

2 Answers

1
votes

"on the consumption front I need to guarantee that the consumer reads each record exactly once"

The answer from Gopinath explains well how you can achieve exactly-once between a KafkaProducer and KafkaConsumer. These configurations (together with the application of Transaction API in the KafkaProducer) guarantees that all data send by the producer will be stored in Kafka exactly once. However, it does not guarantee that the Consumer is reading the data exactly once. This, of course, depends on your offset management.

Anyway, I understand your question that you want to know how the Consumer itself is processing a consumed message exactly once.

For this you need to manage your offsets on your own in a atomic way. That means, you need build your own "transaction" around

  • fetching data from Kafka,
  • processing data, and
  • storing processed offsets externally.

The methods commitSync and commitAsync will not get you far here as they can only ensure at-most-once or at-least-once processing within the Consumer. In addition, it is beneficial that your processing is idempotent.

There is a nice blog that explains such an implementation making use of the ConsumerRebalanceListener and storing the offsets in your local file system. A full code example is also provided.

"do I need to resort to Kafka transaction APIs to create a transactional producer in the consumer polling loop"

The Transaction API is only available for KafkaProducers and as far as I am aware cannot be used for your offset management.

0
votes

'Exactly-once' functionality in Kafka can be achieved by a combination of these 3 settings:

  1. isolation.level = read_committed
  2. transactional.id = <unique_id>
  3. processing.guarantee = exactly_once

More information on enabling the exactly-once functionality:

https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/

https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/