2
votes

I have a Spring Boot project that runs several Kafka Consumers (@KafkaListener) on Confluent Kakfa topics with 8 partitions. The concurrency of each consumer is set to 1. The topics are loaded with about a million line messages from a file & the Consumers consume them in batches to validate, process and update a database.

The Consumer Factory has the following settings - max.poll.records=10000, fetch.min.bytes=100000, fetch.max.wait.ms=1000, session.timeout.ms=240000.

Update 06/04 Here is the Consumer Factory setup. It is Spring-Kafka-1.3.1.RELEASE. The Confluent Kafka broker is version

@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(ListingMessage.class));
}

@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
    ConsumerFactory<String, ListingMessage> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(listingConsumerFactory);
    factory.setConcurrency(1);
    factory.setAutoStartup(false);
    factory.setBatchListener(true);
    return factory;
}

Note: The Container Factory has auto-startup set to false. This is to manually start/stop the consumer when loading a large file.

After running for about 1 hour (the time varies) the Consumers stop consuming messages from its topic even when the topic has many messages available. There is a log statement in the consume method that stops printing in the logs.

I track the Consumer's status with the "./kafka-consumer-groups" command and see that there are no consumers in this group after some time.

$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group  group_name

There are no errors in the log for this consumer failing. The consumer method is enclosed in a try-catch block, so it would catch any exception that was thrown during processing the messages.

How can we design the Spring-Kafka consumer so that it restarts the consumer if it stops consuming? Is there a listener that can log the exact point when a consumer stops? Is this because of setting the concurrency to 1? The reason I had to set the concurrency to 1 was that there were other Consumers that were slowed down if this Consumer had more concurrency.

1
What version of spring-kafka? You also need to show your consumer configuration. You should take a thread dump to see what the listener threads are doing; often such hangs are caused by the thread being "stuck" in user code.Gary Russell
@GaryRussell thanks. I have added the consumer config and Kafka version.Shankar P S
How long does it take you to consume 10,000 messages? The MAX_POLL_INTERVAL_MS defaults to 5 minutes; if you take longer than that, the broker thinks you are dead and does a rebalance.Gary Russell
@ShankarPS were you able to resolve the issue? If yes, can you tell how?Nobita

1 Answers

4
votes

I just ran a test with a 30 second max.poll.interval.ms=30000, suspended the listener, resumed it after 30 seconds; and I see this in the log...

2018-06-04 18:35:59.361  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
foo

2018-06-04 18:37:07.347 ERROR 4191 --- [      foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

2018-06-04 18:37:07.350  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so50687794-0]
2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
2018-06-04 18:37:10.400  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
2018-06-04 18:37:10.401  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
2018-06-04 18:37:10.445  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
foo

You can see that after the rebalance the consumer is re-added and the same message is redelivered; which is what I would expect.

I get the same results; even with 1.3.1.