I need to pause the Kafka consumer from consuming messages from the topic until the message reaches it's waiting time. For this one, I used pause/resume methods in Kafka. But when I resume, the first message that consumed before pausing, will not be received again. But still, the offset of the topic has not been updated since I do manual acknowledgment (lag is one).
@StreamListener(ChannelName.MESSAGE_INPUT_RETRY_CHANNEL)
public void onMessageRetryReceive(org.springframework.messaging.Message<Message> message, @Header(KafkaHeaders.CONSUMER)KafkaConsumer<?,?> consumer){
long waitTime = //Calculate the wait time of the message
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(waitTime > 0){
consumer.pause(Collections.singleton(new TopicPartition("message-retry-topic",0)));
}else{
messageProducer.sendMessage(message.getPayload());
acknowledgment.acknowledge();
}
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
boolean isReady = //Logi to check if ready to resume
if(isReady){
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
This relates to the question mentioned in KafkaConsumer resume partition cannot continue to receive uncommitted messages. But I'm not sure how the seeks method can be helpful to retrieve the 1st consumed message. I'm using the spring cloud stream. I need some suggestion on this