2
votes

I would like to know if there is some option in spring-kafka which will grab all the new messages to a list.

For example, if i am listening for Message object, I want to get the List<Message> since the last poll. Something like :

@KafkaListener(poll-interval=1000, topics = "${kafka.topic}", containerFactory = "objectListListenerContainerFactory", )
public void messageListener(List<Message> messages) {
    log.info("Count of new messages since last poll : {}", messages.size());
}

I already went through Spring Kafka: Poll for new messages instead of being notified using `onMessage`. But was not very useful for me.

1

1 Answers

1
votes

With the plain Spring Kafka you can use a ConsumerFactory to create a KafkaConsumer and then you are on your own to poll records from there whenever it is convenient for your.

Also pay attention that KafkaMessageListenerContainer can be paused in arbitrary moment to stop polling, but still be connected to the consumer group.

In Spring Integration Kafka extension there is a new KafkaMessageSource for this kind of tasks:

/**
 * Polled message source for kafka. Only one thread can poll for data (or
 * acknowledge a message) at a time.
 * <p>
 * NOTE: If the application acknowledges messages out of order, the acks
 * will be deferred until all messages prior to the offset are ack'd.
 * If multiple records are retrieved and an earlier offset is requeued, records
 * from the subsequent offsets will be redelivered - even if they were
 * processed successfully. Applications should therefore implement
 * idempotency.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @author Mark Norkin
 * @author Artem Bilan
 *
 * @since 3.0.1
 *
 */
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {

I think we need to consider to document this one in the Spring for Apache Kafka reference manual: https://docs.spring.io/spring-kafka/docs/current/reference/html/_spring_integration.html#si-kafka. So, feel free to raise an issue in the spring-kafka project to address that gap.