0
votes

Using Kafka Broker: 1.0.1 spring-kafka: 2.1.6.RELEASE

I'm using a batched consumer with the following settings:

// Other settings are not shown..
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

I use spring listener in the following way:

 @KafkaListener(topics = "${topics}", groupId = "${consumer.group.id}")
    public void receive(final List<String> data,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitions,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) Set<String> topics,
                        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) { // ......code... }

I always find the a few messages remain in the batch and not received in my listener. It appears to be that if the remaining messages are less than a batch size, it isn't consumed (may be in memory and published to my listener). Is there any way to have a setting to auto-flush the batch after a time interval so as to avoid the messages not being flushed? What's the best way to deal with such kind of situation with a batch consumer?

1
>I always find the a few messages remain in the batch It's not clear what you mean. Turn on DEBUG logging to see the consumer activity. If it's still not doing what you expect, post the log and explain exactly what you mean.Gary Russell
Ok - For example, if my total data size is 230 and my batch size is 100. I only see 200 messages consumed (or rather coming into my listener). The remaining 30 is not consumed. My question was, is it waiting for the batch to be full (70 more messages) and then flush? Or is there any way I can auto flush the batch after 'n' seconds? How is the application supposed to use the batch consumer in such cases?user1189332
On the consumer side, if the messages are already in the topic, You should get the remaining 30 as long as you haven't increased fetch.max.wait.ms and fetch.min.bytes. On the producer side, the messages should go out straight away as long as you haven't increased linger.ms.Gary Russell
I haven't added (or overridden) these properties in my application.properties file and none of the properties you mentioned is set in the DefaultKafkaConsumerFactory. So they just use the defaults. But still doesn't work. Also, the debug logs shows after a while that it is polling and fetching 0 records (and this gets repeated over and over). I couldn't find anything obvious in the logging related to batching of the messages.user1189332

1 Answers

3
votes

I just ran a test without any problems...

@SpringBootApplication
public class So50370851Application {

    public static void main(String[] args) {
        SpringApplication.run(So50370851Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            for (int i = 0; i < 230; i++) {
                template.send("so50370851", "foo" + i);
            }
        };
    }

    @KafkaListener(id = "foo", topics = "so50370851")
    public void listen(List<String> in) {
        System.out.println(in.size());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50370851", 1, (short) 1);
    }

}

and

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.type=batch

and

100
100
30

Also, the debug logs shows after a while that it is polling and fetching 0 records (and this gets repeated over and over).

That implies the problem is on the sending side.