1
votes

I'm having a spark structured streaming job that need to read data from kafka topic and do some aggregation. The job needed to restart daily but when it restart, if I set startingOffsets="latest", I'll loss the data that coming between the restarting time. If I set startingOffsets="earliest" then the job will read all data from topic but not from where the last streaming job left. Can anyone help me how to config to set the offset right where last streaming job left?

I'm using Spark 2.4.0 and kafka 2.1.1, I have tried to set checkpoint location for the writing job but it seem like Spark doesn't check for the offset of kafka message so it keep check the last offset or first offset depend on startingOffsets.

Here is the config for my spark to read from kafka:

val df = spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", host)
         .option("subscribe", topic)
         .option("startingOffsets", offset)
         .option("enable.auto.commit", "false")
         .load()

with example that kafka topic has 10 message with offset from 1 to 10, spark just done processing message 5 and then restart. How can I make spark continue reading from message 5 not from 1 or 11?

1
checkpointing is designed to handle these scenarios. remove .option("startingOffsets", offset) instead use checkpointLocation with hdfs compatible location.Gowtham

1 Answers

2
votes

It seems like with some code I can take the offset that I need and save it to some reliable storage like cassandra. Then when the spark streaming start i just need to read the saved offset and fill it to startingOffsets. This is the codes that help me to get the offset that I need

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._

spark.streams.addListener(new StreamingQueryListener() {
         override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
            println("Query started:" + queryStarted.id)
         }

         override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
            println("Query terminated" + queryTerminated.id)
         }

         override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
            println("Query made progress")
            println("Starting offset:" + queryProgress.progress.sources(0).startOffset)
            println("Ending offset:" + queryProgress.progress.sources(0).endOffset)
            //Logic to save these offsets
            // the logic to save the offset write in here
         }
      })