1
votes

I manage to get the topic/partition/offset in method annotated with @KafkaListener, but how can i use these data to implement a exactly-once consumer logic ?

I'm using ConcurrentKafkaListenerContainerFactory with setting of concurrenc=4, and set the AckMode to MANUAL. My current way is using redis to dedup: I use topic:partition as redis key, offset as its value, then compare the coming offset with the value in redis, if the offset is newer(greater) than redis one, then go on the business logic, else I ignore the message. finally commit the offset(ack.acknowledge())

but this way not work, for example, if the rebalance happen before ack.acknowledge() finish, then it came out this error :org.apache.kafka.clients.consumer.CommitFailedException,

and after rebalancing , the original partition is assigned to another thread, which cause the same message will be consume twice.

so in one word, how to design a logic that can make each kafka message delivery exactly-once ?

2

2 Answers

3
votes

You have to write out the last offset processed atomically, along with the results of the processing, outside of Kafka. This can be to a database or file, just don't do two writes, make it a single atomic write of both data and offset. If your consumer crashes and it or another instance restarts or takes over, you need to make sure that first it reads the last offset stored with the last processing results and seek() to that position before you poll() for more messages. This is how many of the existing Kafka Sink Connectors can achieve EOS consumption today.

0
votes

Kafka does not support exactly once yet. It will be available in the 0.11.0.0 release : https://issues.apache.org/jira/browse/KAFKA-4923 This release is planned for 14 June 2017, so you can wait or build this complex logic yourself ;-)