I have a Spring Boot project that runs several Kafka Consumers (@KafkaListener) on Confluent Kakfa topics with 8 partitions. The concurrency of each consumer is set to 1. The topics are loaded with about a million line messages from a file & the Consumers consume them in batches to validate, process and update a database.
The Consumer Factory has the following settings - max.poll.records=10000, fetch.min.bytes=100000, fetch.max.wait.ms=1000, session.timeout.ms=240000.
Update 06/04 Here is the Consumer Factory setup. It is Spring-Kafka-1.3.1.RELEASE. The Confluent Kafka broker is version
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ListingMessage.class));
}
@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
ConsumerFactory<String, ListingMessage> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(listingConsumerFactory);
factory.setConcurrency(1);
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
Note: The Container Factory has auto-startup set to false. This is to manually start/stop the consumer when loading a large file.
After running for about 1 hour (the time varies) the Consumers stop consuming messages from its topic even when the topic has many messages available. There is a log statement in the consume method that stops printing in the logs.
I track the Consumer's status with the "./kafka-consumer-groups" command and see that there are no consumers in this group after some time.
$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_name
There are no errors in the log for this consumer failing. The consumer method is enclosed in a try-catch block, so it would catch any exception that was thrown during processing the messages.
How can we design the Spring-Kafka consumer so that it restarts the consumer if it stops consuming? Is there a listener that can log the exact point when a consumer stops? Is this because of setting the concurrency to 1? The reason I had to set the concurrency to 1 was that there were other Consumers that were slowed down if this Consumer had more concurrency.