3
votes

I have a single kafka consumer which is connected to a topic with 3 partitions. As soon as I get a record from kafka, I would like to capture the offset and partition. On restart I would like to restore the position of the consumer from the last read offset

From kafka documentation:

Each record comes with its own offset, so to manage your own offset you just need to do the following:

Configure enable.auto.commit=false

Use the offset provided with each ConsumerRecord to save your position.

On restart restore the position of the consumer using seek (TopicPartition, long).

Here is my sample code:

constructor{    
    load data into offsetMap<partition,offset>
    initFlag=true;
}

Main method
{
    ConsumerRecords<String, String> records = consumer.poll(100);
    if(initFlag) // is this correct way to override offset position?
    {
        seekToPositions(offsetMap); 
        initFlag=false;
    }
    while(!shutdown)
    {
        for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                getOffsetPositions();// dump offsets and partitions to db/disk
        }   
   }
}

//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
    //code to put partition and offset into map
    //write to disk or db

    }
} // Overrides the fetch offsets that the consumer

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
            //code get partitions and offset from offsetMap
            consumer.seek(partition, offset);

    }

Is this the right way to do? is there any better way?

3

3 Answers

1
votes

If you commit your offsets Kafka will store them for you (for up to 24 hours by default).

That way if your consumer dies you could start the same code on another machine and continue right from where you left off. No external storage needed.

See "Offsets and Consumer Position" in https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

and recommend you consider to use commitSync

0
votes

That's ok to me, just be careful on how your consumer is build (manual partition assignation or automatic)

If the partition assignment is done automatically special care is needed to handle the case where partition assignments change. This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener). For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by implementing ConsumerRebalanceListener.onPartitionsRevoked(Collection). When partitions are assigned to a consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer to that position by implementing ConsumerRebalanceListener.onPartitionsAssigned(Collection).

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

0
votes

This can be solved by having control over we committing offsets.

Fist thing to do is to turn off the config 'enable.auto.commit' to 'false' at the consumer application, so you have the control over when to commit the offset.

We use Map to manually track offsets as shown below :

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    
    consumer.subscribe(topic, new CommitCurrentOffset());

    try {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            // process the record (ex : save in DB / call external service etc..)

            currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                               new OffsetAndMetadata(record.offset() + 1, null));  // 1
        }
            consumer.commitAsync(currentOffsets, null);  // 2
    }
    finally {
        consumer.commitSync(currentOffsets);  // 3
    }

  class CommitCurrentOffset implements ConsumerRebalanceListener {  // 4
     public void onPartitionRevoked(Collection<TopicPartition> topicPartitions) {
       consumer.commitSync(currentOffsets);
       consumer.close();
     }
  }
  1. As we process each message we add the offset of the message processed in our map as below :

       currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset() + 1, null)); 
    
  2. We commit the offset of the message processed Asynchronously to the broker.

  3. In case of any error/exception while processing the message, we commit the offsets of the latest message that was processed for each partition.

  4. When we are about to lose a partition due to rebalancing, we need to commit offsets. Here, we are committing the latest offsets that we have processed ( In for each loop), not the latest offsets in the batch we are still processing. We achieve this by implementing ConsumerRebalanceListener interface. Whenever a rebalance is triggered, onPartitionRevoked() method will be invoked before rebalancing starts and after consumer stops processing the messages.