How to store message offset in Kafka if i am using KafkaUtils.createDirectStream to read the messages. Kafka is losing the offset value every time the application goes down.It is then reading the value provided in auto.offset.reset(which is latest) and fails to read messages in the stop-start interval of the application.
1 Answers
1
votes
You can avoid that by manually committing the offset. Set enable.auto.commit as false and then use below code to commit the offset in kafka after successful operation.
var offsetRanges = Array[OffsetRange]()
val valueStream = stream.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(_.value())
//operation
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
You can also read this doc which will give you good understanding of offset management https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/