0
votes

I'm using spring-kafka '2.1.7.RELEASE' and I'm trying to understand how does max.poll.interval.ms work with AckMode as BATCH and enable.auto.commit as 'false'. Here are my consumer settings.

    public Map<String, Object> setConsumerConfigs() {

           Map<String, Object> configs = = new HashMap<>();

           configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

           configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "400000");

           configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
           configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

           configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
           configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());

           configs.setPartitionAssignmentStrategyConfig(Collections.singletonList(RoundRobinAssignor.class));

           // Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
           sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
       }

and here are my factory settings

        @Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConsumerConfigs));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.BATCH);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
           factory.setRetryTemplate(retryTemplate());
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           return factory;
         }

         public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setListeners(new RetryListener[]{myCustomKafkaRetryListener});
            retryTemplate.setRetryPolicy(myCustomKafkaConsumerRetryPolicy);

            FixedBackOffPolicy backOff = new FixedBackOffPolicy();
            backOff.setBackOffPeriod(1000);
            retryTemplate.setBackOffPolicy(backOff);


            return retryTemplate;
          }

Here is my consumer where I've added a delay of 2 minutes

@KafkaListener(topics = TestConsumerConstants.CONSUMER_LONGRUNNING_RECORDS_PROCESSSING_TEST_TOPIC
      , clientIdPrefix = "CONSUMER_LONGRUNNING_RECORDS_PROCESSSING"
      , groupId = "kafka-lib-comp-test-consumers")
  public void consumeLongRunningRecord(ConsumerRecord message) throws InterruptedException {
    System.out.println(String.format("\n \n Received message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));

    TimeUnit.MINUTES.sleep(2);

    System.out.println(String.format("\n \n Processing done for the message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));
  }

Now, i published 5 messages and observed that it processed all records without any issues. But if i set the AckMode to RECORD it's throwing below error while committing the offset after processing the 4th message and then processed the same message twice (which is expected).

As per the spring-kafka documentation, the AckMode = BATCH would commit the offset when all the records returned by the poll() have been processed.

Now, my question is, how does the AckMode changing the behavior without causing the rebalance after passing max.poll.interval.ms? Please help me understand.

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2

2 Answers

0
votes

Changing the ack mode will have no bearing on rebalancing; the max.poll.interval.ms is the time allowed between calls to poll() which does not change with the ack mode.

Since your interval is 6.67 minutes; I would expect it to fail with BATCH mode too; perhaps the poll only returned 2 or 3 records for that test, whereas 4 or 5 records were returned when you tested RECORD mode.

You can get the count of records returned by the poll by enabling DEBUG logging for org.springframework.kafka.