I have a single kafka consumer which is connected to a topic with 3 partitions. As soon as I get a record from kafka, I would like to capture the offset and partition. On restart I would like to restore the position of the consumer from the last read offset
From kafka documentation:
Each record comes with its own offset, so to manage your own offset you just need to do the following:
Configure enable.auto.commit=false
Use the offset provided with each ConsumerRecord to save your position.
On restart restore the position of the consumer using seek (TopicPartition, long).
Here is my sample code:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
Is this the right way to do? is there any better way?