0
votes

I'm using spring-kafka '2.2.7.RELEASE' to create a batch consumer and I'm trying to understand how the consumer rebalancing works when my record processing time exceeds max.poll.interval.ms.

Here is my configurations.

public Map<String, Object> myBatchConsumerConfigs() {
       Map<String, Object> configs = = new HashMap<>();
       configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
       configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
       sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
   }

and here are my factory settings

@Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(myBatchConsumerConfigs()));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.BATCH);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
          
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           factory.setBatchListener(true);
           
           factory.setBatchErrorHandler(myBatchConsumerSeekToCurrentErrorHandler);
                   factory.getContainerProperties().setConsumerRebalanceListener(myBatchConsumerAwareRebalanceListener);
                   factory.setRecoveryCallback(context -> {
                       logger.logInfo("In recovery call back for KES Batch Consumer", this.getClass());
                       myBatchConsumerDeadLetterRecoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"), (Exception) context.getLastThrowable());
                       return null;
                   });
           return factory;
         }

I've added custom consumer listener as shown here.

@Component
public class MyBatchConsumerAwareRebalanceListener implements ConsumerAwareRebalanceListener {  

@Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsRevokedBeforeCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsRevokedBeforeCommit", getClass());
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsRevokedAfterCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsRevokedAfterCommit", getClass());
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsAssigned - topic - "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsAssigned", getClass());
    }

}


Here is my consumer where I've added a delay of 400 sec delay/sleep which is greater than the max.poll.interval.ms ( 300 ms)    



@KafkaListener(groupId = "TestBatchConsumers", topics = TEST_KES_BATCH_CONSUMER_TOPIC, containerFactory = "myBatchConsumerContainerFactory")
       public void consumeRecords(List<ConsumerRecord<String, Organization>> consumerRecords) {

           long startTime = System.currentTimeMillis();
           System.out.println("Processing started at  "+startTime);
           consumerRecords.forEach(consumerRecord -> {
               System.out.println(
                       "Received consumerRecord on topic" + consumerRecord.topic()+" , partition "+consumerRecord.partition()
                               + ", at offset " + consumerRecord.offset() + ", with key " + consumerRecord.key() );


               try {
                   Thread.sleep(400000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });

           System.out.println("Processing completed at "+ System.currentTimeMillis());

           long processingTimeInSec = (System.currentTimeMillis() - startTime)/1000 ;
           System.out.println(processingTimeInSec);
       }

Now i was expecting the consumer group to have rebalance because the processing time is more than max.poll.interval.ms but I didn't see any such behavior. Am i missing something here?

Please suggest.

1
The rebalance listener will not be called until the listener exits and attempts the next poll; it is always called on the consumer thread. You should see log messages though. - Gary Russell
Thanks for the response @GaryRussell. Just to understand this little more, I've modified the numbers ( decreased the max.poll.interval.ms to 100000 and delay in the consumer to 120000) and still didn't see any rebalancing related messages. My understanding is, after 120 sec, the consumer will try to poll and then it should cause a rebalance? Isn't it? and i see below logs like 'onPartitionsAssigned' from my listener. - Raj

1 Answers

1
votes

It behaves as expected for me:

@SpringBootApplication
public class So67520619Application {

    public static void main(String[] args) {
        SpringApplication.run(So67520619Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so67520619", "foo");
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67520619").partitions(1).replicas(1).build();
    }

}

@Component
class Listener implements ConsumerAwareRebalanceListener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "so67520619", topics = "so67520619")
    public void listen(List<String> in) throws InterruptedException {
        LOG.info(in.toString());
        Thread.sleep(12000);
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        LOG.info("Assigned: " + partitions);
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        LOG.info("Revoked: " + partitions);
    }

}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
2021-05-13 11:13:16.339  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 11:13:16.358  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo]
2021-05-13 11:13:26.416  INFO 20954 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-c9a440bf-9076-4575-813d-3efb0054f5f7 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 11:13:28.365  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 11:13:28.370 ERROR 20954 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
    ... 3 common frames omitted

2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Revoked: [so67520619-0]
2021-05-13 11:13:28.372  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.376  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 11:13:28.376  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.485  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 3: {consumer-so67520619-1-15ce3150-1aa3-4b43-a892-fbd54e8ed919=Assignment(partitions=[so67520619-0])}
2021-05-13 11:13:28.486  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 3
2021-05-13 11:13:28.487  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 11:13:28.487  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 11:13:28.488  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 11:13:28.489  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 11:13:28.490  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 11:13:28.494  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo]

EDIT

And with 60/65 seconds, it still works for me...

2021-05-13 17:24:28.111  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 17:24:28.130  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo, foo, foo]
2021-05-13 17:25:28.147  INFO 37063 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-269ac261-3838-4925-a9a7-fd0687db3522 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 17:25:33.135  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 17:25:33.141 ERROR 37063 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

...

2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Revoked: [so67520619-0]
2021-05-13 17:25:33.142  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.145  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 17:25:33.145  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.250  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 9: {consumer-so67520619-1-bd22a252-64f2-4be3-a6eb-8371b8f95ff2=Assignment(partitions=[so67520619-0])}
2021-05-13 17:25:33.254  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 9
2021-05-13 17:25:33.254  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 17:25:33.255  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 17:25:33.256  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 17:25:33.258  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 17:25:33.258  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 17:25:33.261  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo, foo, foo]

Kafka complains after 60 seconds and the commit fails 5 seconds later.