3
votes

I am trying to figure out ways to manually commit offsets in Kafka consumer, using Spring-Kafka (1.1.0.RELEASE). I understand, that it will be better to commit these offsets for robust client side implementation, such that other consumers do not process duplicate events, that might have originally been processed by a now dead consumer, or because rebalancing was triggered.

I see there are two ways to handle this -

  • Set the ACK_MODE to MANUAL_IMMEDIATE and in the listener implementation, invoke ack.acknowledge() API

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
    @KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
    public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
        logger.info("===*** batch size : " + batch.size() + "***===");
        batch.forEach(System.out::println);
        acknowledgment.acknowledge();
    }
    
  • Implement a ConsumerRebalanceListener.

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    
    @Override
    public void onPartitionsRevoked(final Collection<TopicPartition> collection) {
    
    }
    
    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
    
    }
    });
    

However, with this approach, I don’t know how can I get a reference to the consumer, to invoke consumer.commitSync() or consumer.commitASync() APIs. For some technical restrictions, I cannot move to the latest version of spring-kafka that supports ConsumerAwareRebalanceListener and has a reference to the Consumer.

So what is the way to utilize ConsumerRebalanceListener to be able to commit offsets to Kafka ?

Also, I am starting multiple consumer listener threads using ConcurrentKafkaListenerContainerFactory.setConcurrency() API, so if specific consumer thread dies, does it have its own instance of the ConsumerRebalanceListener?

1
Try to avoid using internal Kafka commit. I would follow the option of having a external offset handling, it is safer. I use Zookeeper, for example. See more details in this Cloudera article: blog.cloudera.com/blog/2017/06/…dbustosp

1 Answers

1
votes

The consumer rebalance listener cannot be used to commit offsets; with 1.x; the only way is via the Acknowledgment parameter.

1.1.0 is very old; all 1.x users are recommended to upgrade to at least 1.3.5; it has a much simpler threading model thanks to KIP-62 - see the project page.

The current version 2.1.6 (2.1.7 will be released today) has many more options, including ConsumerAwareMessageListener where you can get full access to the consumer.

You cannot interact directly with the consumer from the listener with versions < 1.3 because the consumer is not thread safe and the listener has to be invoked on a different thread.