I manage to get the topic/partition/offset in method annotated with @KafkaListener, but how can i use these data to implement a exactly-once consumer logic ?
I'm using ConcurrentKafkaListenerContainerFactory with setting of concurrenc=4, and set the AckMode to MANUAL. My current way is using redis to dedup: I use topic:partition as redis key, offset as its value, then compare the coming offset with the value in redis, if the offset is newer(greater) than redis one, then go on the business logic, else I ignore the message. finally commit the offset(ack.acknowledge())
but this way not work, for example, if the rebalance happen before ack.acknowledge() finish, then it came out this error :org.apache.kafka.clients.consumer.CommitFailedException,
and after rebalancing , the original partition is assigned to another thread, which cause the same message will be consume twice.
so in one word, how to design a logic that can make each kafka message delivery exactly-once ?