0
votes

I have a quite slow consumer which could take more than 5 minutes to process the record. What I want to avoid is kafka restabilizing the group. In order to do this from my understanding I have to set to the kafka broker the following properties:

  group.max.session.timeout.ms = 3600001 
  group.min.session.timeout.ms = 3600000

In my application side I have the following configuration :

    @Bean
      public Map<String, Object> consumerConfigs() {
        final Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            environment.getProperty("app.kafkaBrokers"));
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) );
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) + 1 );
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
      }

@Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(9);// was 3
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

And also in my listener I have:

 @KafkaListener(id = "baz", topics = "tipJobsForExecution", containerFactory="kafkaListenerContainerFactory")
  public void listen(ConsumerRecord<?, ?> record)

My listener takes about 5 minutes to process the messages. As soon as it finished I read the following at kafka broker side:

2018-05-03 10:29:11,210] INFO [GroupCoordinator 0]: Preparing to rebalance group baz with old generation 22 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)

From my understanding kafka consideres the consumer dead and rebalances the group. My question is why is this happenning ? One idea that I have is that maybe the heartbeat is not heartbeating every 3000ms as it should but I do not know how troubleshout this.

Thanks in advance, Giannis

1

1 Answers

3
votes

You have to be aware of three types of timeout configuration parameters for Kafka consumer.

heartbeat.interval.ms - The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Typically should be 1/3 of the session.timeout value Default value - 3000 ms

session.timeout.ms - If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a re-balance.Default value 10000

max.poll.interval.ms - If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will re-balance Default value - 300000

In your case it looks like the poll interval is set to a too low value.

Reference - https://kafka.apache.org/documentation/#newconsumerconfigs