I am using the @KafkaListener for consuming from the kafka topic, I have an application logic to process each record by multiple consumers in different consumer group.
Now my problem statement is that -I have to send the consumed message to third party rest-end point. If message sending fails to the rest-end point ,I should not commit the offset and need to retry sending the message based on the configurable number of times. After configurable no of retries, if it fails to send the message. I need to log the cause of the exception and commit the offset.
I am using spring kafka 2.2.0. Below is my configuration for manual acknowledgement
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual-immediate
logging.level.org.springframework.kafka=debug
Till now i am able to consume the message published into a topic by different consumer. and also able to manually acknowledge the offset from different consumer. for example
@KafkaListener(topics = "testTopic",groupId="group-1")
public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("OF PayLoad:{}", record);
logger.info("OF value:"+record.value());
logger.info("OF Offset:{}",record.offset());
ack.acknowledge();
}
@KafkaListener(topics = "testTopic",groupId="group-2")
public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("MF PayLoad_2:{}", record);
logger.info("MF value_2:"+record.value());
logger.info("MF Offset_2:{}",record.offset());
//ack.acknowledge();
}
here,consumerTwo is not acknowledging the message. I want to retry to send all the message again to the consumer based on the configurable amount of time.
Any example or suggestion suggestion will be great help.