0
votes

I'm working on a module where it consumes messages from a Kafka topic and publish to a downstream system. In the event of downstream system is unavailable consumer do not acknowledge the Kakfa message. Because of this when my consumer receives messages when downstream system is unavailable offset of the kakfa will not be committed. But if I receive new message after downstream system comes up and when I acknowledge that message, latest offset will be committed and consumer never receive those messages which were in the topic without the offset commit.

i.e Let's say my consumer is consumed up to offset 4. Consumer receive two messages when downstream is unavailable and because of that my consumer didn't commit the offset. So number of messages in the toipc is now 6, but offset is still 4. Now downstream system is available and consumer receive a new message (7th message). Since there is no issue from downstream, consumer acknowledge the 7th message and offset of the topic will be set to 7.

Is there any method that my consumer can receive the 5th and 6th messages before it receives the 7th message? I use spring cloud stream in the implementation.

1

1 Answers

0
votes

See this answer.

You need a SeekToCurrentErrorHandler and throw an exception so that the offsets are reset.