0
votes

With the Java Kafka Consumer seek() function it requires we pass in the TopicPartion and Offest. However, I thought this seek method would take a collection of subscribed TopicPartitions for my consumer.

Here is my example I am trying to handle.

Consumer A is subscribed to topic "test-topic" partitions 1 and 2. I read messages from each partition when I call poll(). I process some messages but my application gets an exception. I do not call commitSync(). Now I want to rewind to those offsets that I retrieved on my last poll() and try to reprocess them. So how should I do this? Do I need to go through the last committed offsets for each topic partition and call seek() for each partition? Will calling seek() multiple times only accept the last seek() that is called? As I said I want to make sure my consumer goes back for all partitions so I do not lose any data on any assigned partitions.

1

1 Answers

1
votes

I process some messages but my application gets an exception. I do not call commitSync()

If you do not call commitSync() the messages will not be committed. If suppose the exception kills your program, then after restart the consumer typically reads it from the last committed offset.

You may also want to check auto.offset.reset and set it to earliest.

Check if your messages are auto committed, since you are doing commitSync() you don't need auto commit i.e. enable.auto.commit can be set to false (by default it is true in Confluent Kafka)

In case, your program is not terminated by the exception, you always have the consumed records. You can retry processing each record and then commit.

ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records)
{
   tryProcess(record, 3);
}
consumer.commitSync();

void tryProcess(ConsumerRecord record, int maxRetries) {
    if(maxRetries < 1) {
        log.warn("max retries exhausted for record");
        return;
    }
    try {
         process(record);
    } catch(Exception ex){ 
        tryProcess(record, --maxRetries);
    }
}

You can also try processing batch of records with retries instead of each record like tryProcess(records, 3) where records correspond to ConsumerRecords and the batch is retried 3 times. I don't think there is a need for seeking.


I'm still curious about the usage of the seek() api though

seek() may be used for example, when we are not using subscriptions i.e. consumer.subscribe() but rather consumer.assign() which we usually do when we just want to peek (view) the messages in the topic, for example like console consumer. Sometimes, we may need to see some messages after a certain offset, or the last n messages etc without actually doing anything with them, but just displaying.