0
votes

I'm using spring-kafka '2.1.7.RELEASE' and here are my consumer settings.

    public Map<String, Object> setConsumerConfigs() {

           Map<String, Object> configs = = new HashMap<>();

           configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

           configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
           configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

           configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
           configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());

           configs.setPartitionAssignmentStrategyConfig(Collections.singletonList(RoundRobinAssignor.class));

           // Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
           sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
       }

and here are my factory settings

        @Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConsumerConfigs));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.RECORD);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
           factory.setRetryTemplate(retryTemplate());
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           return factory;
         }

         public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setListeners(new RetryListener[]{myCustomKafkaRetryListener});
            retryTemplate.setRetryPolicy(myCustomKafkaConsumerRetryPolicy);

            FixedBackOffPolicy backOff = new FixedBackOffPolicy();
            backOff.setBackOffPeriod(1000);
            retryTemplate.setBackOffPolicy(backOff);


            return retryTemplate;
          }

Here is my consumer where I've added a delay of 6 minutes which is greater than the default max.poll.interval.ms

@KafkaListener(topics = TestConsumerConstants.CONSUMER_LONGRUNNING_RECORDS_PROCESSSING_TEST_TOPIC
      , clientIdPrefix = "CONSUMER_LONGRUNNING_RECORDS_PROCESSSING"
      , groupId = "kafka-lib-comp-test-consumers")
  public void consumeLongRunningRecord(ConsumerRecord message) throws InterruptedException {
    System.out.println(String.format("\n \n Received message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));

    TimeUnit.MINUTES.sleep(6);

    System.out.println(String.format("\n \n Processing done for the message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));
  }

Now I'm getting below error and trying to process the same record again and again because it couldn't commit the offset (which is expected).

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

And now, i tried with setting the 'session.timeout.ms' = 420000. Now I'm getting below error but I didn't set any values for group.min.session.timeout.ms and group.max.session.timeout.ms. And the the default values for group.min.session.timeout.ms and group.max.session.timeout.ms are 6000 and 1800000 respectively.So, Can someone help me understand why am i getting this error?

Caused by: org.apache.kafka.common.errors.InvalidSessionTimeoutException: The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms). 
1

1 Answers

1
votes

I don't know why you are getting that error, but session timeout is no longer relevant; see KIP-62. Maybe the defaults have been changed and the docs not updated.

You need to increase max.poll.interval.ms to avoid the rebalance.