I have a use case where I am writing a batch job
I need to read a Kafka Topic and journal the data to HDFS. My code looks as below
val df: DataFrame = spark.read
.format("kafka")
.option("subscribe", "test-topic")
.option("includeTimestamp", true)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("group.id", "test-cg")
.option("checkpointLocation", "/group/test/checkpointsDir")
.load
df.write.
parquet(buildPathWithCurrentBatchTime())
Every time the job reads Kafka topic, it starts from the earliest offset and hence the same message is journaled in multiple batches. How do I make the my job read the messages starting from offsets after the offset read by the previous job instance.
I tried setting the checkpoint location, group id but did not help.
I dont want to use a Streaming Query. I have a plain use case of journaling the data from Kafka Topic. I dont have any latency requirement. The only requirements is to not have any duplicates in the journals. This is a low priority. If I use a streaming query it will use the executors all the time which is waste of resources. Hence I want to do it in batch