1
votes

We are using Spring Kafka 2.2.2 Release to retrieve records from Kafka using @KafkaListener and with the ConcurrentKafkaListenerContainerFactory. We have configured the max-poll-records to 5, however it always gives only 1 record in the list to the consumer instead of 5 records.

Whilst with the same configuration, it works in Spring Kafka 2.1.4.Release.

Here is our application.yml configuration:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      enable-auto-commit: false
      max-poll-records: 5
      bootstrap-servers: localhost:9092
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.gap.cascade.li.data.xx.xx.CustomDeserialiser

Here is our ConcurrentKafkaListenerContainerFactory:

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);

    return factory;
  }

Are we missing any configuration which needs to be done for Spring Kafka 2.2.2 Release?

1
Show your @KafkaListener method. - Gary Russell

1 Answers

3
votes

Assuming you have a listener

@KafkaListener(...)
public void listen(List<...> data) {
    ...
}

Setting factory.setBatchListener(true); should work for you (as long as there is more than one record ready).

You can also use the boot property

spring:
  kafka:
    listener:
      type: batch

to do the same thing; avoiding the need to declare your own factory.

If you turn on DEBUG logging, the container will log how many records are returned by the poll. You can also set fetch.min.bytes and fetch.max.wait.ms to influence how many records are returned if only one is immediately ready...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        fetch.min.bytes: 10000
        fetch.max.wait.ms: 2000
    listener:
      type: batch

BTW, the current 2.2.x release is 2.2.7 (boot 2.1.6).