I am trying to figure out ways to manually commit offsets in Kafka consumer, using Spring-Kafka (1.1.0.RELEASE). I understand, that it will be better to commit these offsets for robust client side implementation, such that other consumers do not process duplicate events, that might have originally been processed by a now dead consumer, or because rebalancing was triggered.
I see there are two ways to handle this -
Set the ACK_MODE to MANUAL_IMMEDIATE and in the listener implementation, invoke ack.acknowledge() API
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); @KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory") public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception { logger.info("===*** batch size : " + batch.size() + "***==="); batch.forEach(System.out::println); acknowledgment.acknowledge(); }
Implement a ConsumerRebalanceListener.
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(final Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(final Collection<TopicPartition> collection) { } });
However, with this approach, I don’t know how can I get a reference to the consumer, to invoke consumer.commitSync()
or consumer.commitASync()
APIs. For some technical restrictions, I cannot move to the latest version of spring-kafka that supports ConsumerAwareRebalanceListener
and has a reference to the Consumer
.
So what is the way to utilize ConsumerRebalanceListener
to be able to commit offsets to Kafka ?
Also, I am starting multiple consumer listener threads using ConcurrentKafkaListenerContainerFactory.setConcurrency()
API, so if specific consumer thread dies, does it have its own instance of the ConsumerRebalanceListener?