I have a custom Kafka Consumer in which I use to send some requests to a REST API. According to the response from the API, I either commit the offset or skip the message without commit.
Minimal example:
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
// Sending a POST request and retrieving the answer
// ...
if (responseCode.startsWith("2")) {
try {
consumer.commitSync();
} catch(CommitFailedException ex) {
ex.printStackTrace();
}
} else {
// Do Nothing
}
}
}
Now when a response from the REST API does not start with a 2
the offset is not committed, but the message is not re-consumed. How can I force the consumer to re-consume messages with uncommitted offsets?