37
votes

I was using kafka 0.10.2 and now faced a CommitFailedException. like:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

2
Although the user thread hangs for more than 10 seconds, heartbeat thread could still be able to send out heartbeats normally, that's why no exceptions were thrown and that's also the reason why max.poll.intervals.ms was introduced. What I am interested in is why you still got CommitFailException when max.poll.intervals.ms is set to Integer.MAX_VALUE.amethystic
That's my question, I'm also very confused about this. ..Simon Su
Could it be reproduced easily or it just happened only once?amethystic

2 Answers

17
votes

session.timeout.ms set on the consumer should be less than the group.max.session.timeout.ms set on Kafka broker.

This resolved the issue for me.

Credit to github link Commit Failures

12
votes

Hi For this you need to handle the rebalancing condition in your code and should process the ongoing message and commit it before rebalancing

Like :

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Implement what you want to do once rebalancing is done.
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // commit current method
    }
}

and Use this syntax for subscribing the topic :

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

The advantage of doing this :

  1. Messages will not repeat when the rebalancing is taking place.

  2. No commit fail exception