0
votes

I have a custom Kafka Consumer in which I use to send some requests to a REST API. According to the response from the API, I either commit the offset or skip the message without commit.

Minimal example:

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

Now when a response from the REST API does not start with a 2 the offset is not committed, but the message is not re-consumed. How can I force the consumer to re-consume messages with uncommitted offsets?

3

3 Answers

1
votes

Make sure your data is idempotent if you are planning to use seek(). Since you are selectively committing offsets, the records left out are possibly going to be before committed (successfully processed) records. If you do seek() - which is moving your groupId's pointer to uncommitted offset and start the replay, you will get those successfully processed messages also. It also has potential of becoming an infinite loop.

Alternatively, you can save unsuccessful record's metadata in memory or db and replay topic from beginning with "poll(retention.ms)" so that all records are replayed back but add a filter to process only those through API whose metadata matches with what you had saved earlier. Do this as a batch processing once every hour or few hours.

1
votes

Committing offsets is just a way to store the current offset, also know as position, of the Consumer. So in case it stops, it (or the new consumer instance taking over) can find its previous position and restart consuming from there.

So even if you don't commit, the consumer's position is moved once you receive records. If you want to reconsume some records, you have to change the consumer's current position.

With the Java client, you can set the position using seek().

In your scenario, you probably want to calculate the new position relative to the current position. If so you can find the current position using position().

0
votes

Below are the alternate approaches you can take(instead of seek) :

  1. When REST is failed, move the message to a adhoc kafka topic. You can write another program to read the messages of this topic on a scheduled manner.
  2. When REST is failed, write the Request to a flat flat. Use a shell/any script to read each request and send it on a scheduled basis.