0
votes

I am facing some weird rebalancing issue on my Kafka consumer. I have implemented an infinite retry policy for errors using SeekToCurrentErrorHandler set the max.poll.interval.ms=1,200,000 ie 20 minutes and set retry delay as 900,000 15 minutes delay.
As you can see the poll.interval > delay. The problem I am facing is when a new consumer gets added, it rebalances and the old consumer leaves group, only new consumer receives and process the message. In the new consumer I see logs Attempt to heartbeat failed since group is rebalancing but still it processes messages. The old consumer receives 0 data.

my consumer config below:

spring:
    kafka:
       ......
       ......
        consumer:
             max.poll.records: 150
             group.id: xxxxx
             properties:
                   enable.auto.commit: false
                   max.poll.interval.ms: 1200000 #20 minutes greater than retry interval

Kafka java config:

public ConcurrentkafkaListenerContainerFactory<String, byte[]> kafkaFactory() {
    
       ConcurrentkafkaListenerContainerFactory<String, byte[]> = new 
                                ConcurrentkafkaListenerContainerFactory();
        ......
        ......
   factory.setErrorHandler(kafkaErrorHandler);
   facory.setRetryTemplate(retrtTemplate());
   factory.setStatefulRetry(true)
   factory.getContainerProperties().setAckOnError(false);
   factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANIAL_IMMEDIATE);
   return factory;
  }

Error Handler:

@Component
public class KafkaErrorHandler extends SeekToCurrentErrorHandler {
   KafkaErrorHanbdler(){super(-1)}
    @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, 
          ?> consumer, MessageListenerContainer container) {

            LOG.info("handle");
            super.handle(thrownException, records, consumer, container);
        }
    
}

My app takes around 10s to process each event and send acknowledgement for each only after successful processing. It takes 150 messages as configured. The max poll interval is set to 20 minutes. Does the kafka do poll only after 150 messages are processed?

1

1 Answers

0
votes

With stateful retry, the exception is thrown to the container and we will re-seek the unprocessed records and polled again.

DEBUG logging should help you figure out what's wrong.

With newer versions (since 2.3) you can configure the SeekToCurrentErrorHandler with a BackOff instead of a retry template.

Adding a new consumer while the current one is busy will mean the rebalance will be delayed until the first one polls again. The existing consumer won't "leave the group" until it times out.