0
votes

I have a simple Spring boot service that is called on-demand and consumes specified number of messages from the topic. Number of messages to consume is passed as a parameter. Service is being called every 30 minutes. Each message size is ~1.6 kb. I always get around 1100 or 1200 messages every-time. Have one topic with one partition only and REST service is the only consumer. Here is how the service is called http://example.com/messages?limit=2000

private OutputResponse getNewMessages(String limit) throws Exception {
    
        System.out.println("***** START *****");
        
        final long start = System.nanoTime();
        
        int loopCntr = 0;   
        int counter = 0;
        OutputResponse outputResponse = new OutputResponse();       
        Output output = new Output();
        List<Output> rspListObject = new ArrayList<>();
        Consumer<Object, Object> consumer = null;
        String result = null;

        try {
            Properties p = new Properties();
            p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000");
            p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
            
            consumer = consumerFactory.createConsumer("my-group-id", null, null, p);            
            consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));

            while (loopCntr < 2) {
                loopCntr++;
                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(15));
                
                for (ConsumerRecord<Object, Object> record : consumerRecords)
                {
                    counter++; 
                    try
                    {
                        //get json string
                        result = mapper.writeValueAsString(record.value());
                        //to json
                        output = mapper.readValue(result, Output.class);                   
                        rspListObject.add(output);                       
                    } catch (Exception e) {
                        logger.error(e);
                        insertToDB(record.value(),record.offset());
                    }
                }
            }

            outputResponse.setObjects(rspListObject);
            
            final long end = System.nanoTime();
            System.out.println("Took: " + ((end - start) / 1000000) + "ms");
            System.out.println("Took: " + (end - start) / 1000000000 + " seconds");

            // commit the offset of records to broker
            if (counter > 0) {
                consumer.commitSync();
            }
        } finally {
            try {
                System.out.println(" >>>>> closing the  consumer");
                if (consumer != null)
                    consumer.close();
            }catch(Exception e){
                //log message
            }
        }

        return outputResponse;
    }

this is what I have in application.yml

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: '*'
        max.poll.interval.ms: 300000
      group-id: my-group-id

ConsumerConfig values:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100

This is an error I am getting at commitSync. Tried with consuming 5 messages when doing poll(), tried with setting p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000"); but same error

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

1
Are you closing the consumer at the end? If not, you will get multiple consumers, which will cause the rebalance. - Gary Russell
Yes consumer is getting closed in the finally block. finally { try { System.out.println(" >>>>> closing the consumer"); if (consumer != null) consumer.close(); }catch(Exception e){ //log message - Developer
How much time between the poll and commit? - Gary Russell
I see 15-17 seconds from poll to commit. - Developer
Then what you are saying makes no sense; especially because you are manually assigning the partitions and not using group management and only have one instance. If you can provide a minimal, reproducible, complete example I will take a look to see what you are doing wrong. - Gary Russell

1 Answers

0
votes

I believe that this application simulates your use case but it doesn't exhibit the behavior you describe (as I expected). You should never see a rebalance when manually assigning the topic/partition.

I suggest you run both and compare DEBUG logs to figure out what's wrong.

@SpringBootApplication
public class So63713473Application {

    public static void main(String[] args) {
        SpringApplication.run(So63713473Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63713473").partitions(1).replicas(1).build();
     }

    @Bean
    public ApplicationRunner runner(ConsumerFactory<String, String> factory, KafkaTemplate<String, String> template) {
        String msg = new String(new byte[1600]);
        return args -> {
            while (true) {
                System.out.println("Hit enter to run a consumer");
                System.in.read();
                int count = 0;
                try (Consumer<String, String> consumer = factory.createConsumer("so63713473", "")) {
                    IntStream.range(0, 1200).forEach(i -> template.send("so63713473", msg));
                    consumer.assign(Collections.singletonList(new TopicPartition("so63713473", 0)));
                    while (count < 1200) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                        count += records.count();
                        System.out.println("Count=" + count);
                    }
                    consumer.commitSync();
                    System.out.println("Success");
                }
            }
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.fetch-min-size=1920000
spring.kafka.consumer.fetch-max-wait=1000

spring.kafka.producer.properties.linger.ms=50

EDIT

I can reproduce your issue by adding a second (auto-assigned) consumer in the same group.

@KafkaListener(id = "so63713473", topics = "so63713473")
public void listen(String in) {
    System.out.println(in);
}
2020-09-08 16:40:15.828 ERROR 88813 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:776) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.demo.So63713473Application.main(So63713473Application.java:25) [classes/:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

You can't mix manual and auto assignment in the same group.