3
votes

I am using Spring Kafka in my project as it seemed a natural choice in a Spring based project to consume Kafka messages. To consume messages, I can make use of the MessageListener interface. Spring Kafka internally takes care to invoke my onMessage method for each new message.

However, in my setting I prefer to explicitly poll for new messages and work on them sequentially (which will take a few seconds). As a workaround, I might just block inside my onMessage implementation, or buffer the messages internally. However, this seems to go against the core idea of Spring Kafka.

Kafka is designed so that consumers have to poll for new messages, which matches my requirements. Is there a way to make use of this "natural" workflow with Spring Kafka?

Should I refrain from using Spring Kafka for this use case?

The KafkaConsumer documentation states:

For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

Related issue: https://github.com/spring-projects/spring-kafka/issues/195

1
I might use a AcknowledgingConsumerAwareMessageListener and pause the consumer whenever too many messages piled up.C-Otto
If I understand correctly the std framework here does not process messages sequentially even if single partition? or resolved as stated below...?thebluephantom

1 Answers

8
votes

The issue with having to keep polling the consumer has now been resolved (in 0.10.1.x by KIP-62) so that's not an issue any more (as long as you don't exceed the max.poll.interval.ms) which is 5 mins by default but can be increased.

However, if you want to poll yourself, you can still use spring-kafka (e.g. to get the Spring Boot auto configuration goodness if you are using Boot), but you can get a Consumer from the DefaultKafkaConsumerFactory and poll() it directly.