I am facing some weird rebalancing issue on my Kafka consumer. I have implemented an infinite retry policy for errors using SeekToCurrentErrorHandler
set the max.poll.interval.ms=1,200,000
ie 20 minutes and set retry delay as 900,000
15 minutes delay.
As you can see the poll.interval > delay. The problem I am facing is when a new consumer gets added, it rebalances and the old consumer leaves group, only new consumer receives and process the message. In the new consumer I see logs Attempt to heartbeat failed since group is rebalancing
but still it processes messages. The old consumer receives 0 data.
my consumer config below:
spring:
kafka:
......
......
consumer:
max.poll.records: 150
group.id: xxxxx
properties:
enable.auto.commit: false
max.poll.interval.ms: 1200000 #20 minutes greater than retry interval
Kafka java config:
public ConcurrentkafkaListenerContainerFactory<String, byte[]> kafkaFactory() {
ConcurrentkafkaListenerContainerFactory<String, byte[]> = new
ConcurrentkafkaListenerContainerFactory();
......
......
factory.setErrorHandler(kafkaErrorHandler);
facory.setRetryTemplate(retrtTemplate());
factory.setStatefulRetry(true)
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANIAL_IMMEDIATE);
return factory;
}
Error Handler:
@Component
public class KafkaErrorHandler extends SeekToCurrentErrorHandler {
KafkaErrorHanbdler(){super(-1)}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?,
?> consumer, MessageListenerContainer container) {
LOG.info("handle");
super.handle(thrownException, records, consumer, container);
}
}
My app takes around 10s to process each event and send acknowledgement for each only after successful processing. It takes 150 messages as configured. The max poll interval is set to 20 minutes. Does the kafka do poll only after 150 messages are processed?