0
votes

I have a use case where I am writing a batch job

I need to read a Kafka Topic and journal the data to HDFS. My code looks as below

val df: DataFrame = spark.read
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

df.write.
  parquet(buildPathWithCurrentBatchTime())

Every time the job reads Kafka topic, it starts from the earliest offset and hence the same message is journaled in multiple batches. How do I make the my job read the messages starting from offsets after the offset read by the previous job instance.

I tried setting the checkpoint location, group id but did not help.

I dont want to use a Streaming Query. I have a plain use case of journaling the data from Kafka Topic. I dont have any latency requirement. The only requirements is to not have any duplicates in the journals. This is a low priority. If I use a streaming query it will use the executors all the time which is waste of resources. Hence I want to do it in batch

1
Add option("startingOffsets ", "latest"). ...sandeep rawat
Please clarify what you really want to let other not try to guess your intention. I think you're stating "I have a use case where I am writing a batch job with Spark Structured Streaming APIs." in your question. That makes confusion - you are not using structured streaming, as you don't initialize your source and sink to readStream / writeStream.Jungtaek Lim

1 Answers

1
votes

What you are using is batch query instead of streaming query. (Maybe missing spot?) Simply replacing read to readStream and write to writeStream would work for you.

EDIT: As OP clarified it's OK to use one time trigger, I just updated the code to use Structured Streaming with one time trigger. (DISCLAIMER: I didn't compile/run the code but the change is fit to Structured Streaming guide doc.)

val df: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

val query = df.writeStream
  .format("parquet")
  .option("path", buildPathWithCurrentBatchTime())
  .trigger(Trigger.Once())
  .start()

query.awaitTermination()