0
votes

I need to pause the Kafka consumer from consuming messages from the topic until the message reaches it's waiting time. For this one, I used pause/resume methods in Kafka. But when I resume, the first message that consumed before pausing, will not be received again. But still, the offset of the topic has not been updated since I do manual acknowledgment (lag is one).

@StreamListener(ChannelName.MESSAGE_INPUT_RETRY_CHANNEL)
public void onMessageRetryReceive(org.springframework.messaging.Message<Message> message, @Header(KafkaHeaders.CONSUMER)KafkaConsumer<?,?> consumer){

    long waitTime = //Calculate the wait time of the message
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
    if(waitTime > 0){
        consumer.pause(Collections.singleton(new TopicPartition("message-retry-topic",0)));
    }else{
        messageProducer.sendMessage(message.getPayload());
        acknowledgment.acknowledge();
    }
}


@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
    return event -> {
        boolean isReady = //Logi to check if ready to resume
        if(isReady){
            event.getConsumer().resume(event.getConsumer().paused());
        }
    };
}

This relates to the question mentioned in KafkaConsumer resume partition cannot continue to receive uncommitted messages. But I'm not sure how the seeks method can be helpful to retrieve the 1st consumed message. I'm using the spring cloud stream. I need some suggestion on this

1

1 Answers

1
votes

The fact that you don't call acknowledgment.acknowledge();, doesn't mean that your KafkaConsumer instance doesn't keep the last consumed position in the memory.

We definitely need to commit offsets for subsequent consumers on the partition. Currently ran consumer doesn't need such an information to be committed, because it has it in its own in-memory state.

To be able to reconsume the same record you need to perform seek() operation. See Docs for more info: https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek