4
votes

I have switched off auto commit and not committing offset also from consumer after reading.

Checked consumer lag is also remaining same, it ensures that offset is not getting committed. But the problem is, it is consuming next msg not the same message again.

How can I keep reading same message again and again. I should be able to read next message only if previous offset has been committed. Please help me here doing this.

1
Are you using custom kafka-file-consumer provided by kafak? - Mehul Gupta
If your consumers are running, they maintain offset in-memory. Committing offsets is only used to preserve offsets if a consumer shuts down, so that when it's up again it can read from the same offset where it left. Whenever you do poll() the offset is always increased. - Afee
@MehulGupta: I am using org.apache.kafka.connect.sink.SinkTask. overridden it. - Rahul
You can override this functionality by not checking the offset from the topic and partition. Just check the code. - Mehul Gupta
@AftabVirtual: Thanks for the answer. How can I keep polling same record. My use case is if I couldn't processed data successfully, I don't want to read next data, trying keep on reading same data. - Rahul

1 Answers

12
votes

If you know which partition your kafka consumer is currently accessing, you can use kafkaconsumer.seek(partition, offset) method to tell your consumer to read message from a particular offset. Example:

//to get the partition from consumer record
val partition: Int = consumerRecord.partition() 

//to get offset of current record
val recordOffset = consumerRecord.offset() 

if(data processing fail condition)
  consumer.seek(new TopicPartition(topic, partition), recordOffset )

//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100)