3
votes

I have troubles understanding how checkpoints work when working with Spark Structured streaming.

I have a spark process that generates some events, which I log in an Hive table. For those events, I receive a confirmation event in a kafka stream.

I created a new spark process that

  • reads the events from the Hive log table into a DataFrame
  • joins those events with the stream of confirmation events using Spark Structured Streaming
  • writes the joined DataFrame to an HBase table.

I tested the code in spark-shell and it works fine, below the pseudocode (I'm using Scala).

val tableA = spark.table("tableA")

val startingOffset = "earliest"

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets)
  .option("otherOptions", otherOptions)

val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")

joinTableAWithStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  ).start()
  .awaitTermination()

Now I would like to schedule this code to run periodically, e.g. every 15 minutes, and I'm struggling understanding how to use checkpoints here.

At every run of this code, I would like to read from the stream only the events I haven't read yet in the previous run, and inner join those new events with my log table, so to write only new data to the final HBase table.

I created a directory in HDFS where to store the checkpoint file. I provided that location to the spark-submit command I use to call the spark code.

spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory 
--all_the_other_settings_and_libraries

At this moment the code runs fine every 15 minutes without any error, but it doesn't do anything basically since it is not dumping the new events to the HBase table. Also the checkpoint directory is empty, while I assume some file has to be written there?

And does the readStream function need to be adapted so to start reading from the latest checkpoint?

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets) ??
  .option("otherOptions", otherOptions)

I'm really struggling to understand the spark documentation regarding this.

Thank you in advance!

1
can you post full code or actual code which is working in scala shell ??Srinivas
my guess is dataframe joinTableAWithStreamOfData is empty & because of this writestream is not triggered or started.. if it is started, checkpoint location would have created.Srinivas
Try looking at this answer: stackoverflow.com/questions/44584476/…Rayan Ral
Thanks for your comments! @Rayan I saw already that answer and that triggered me to write my question :) So the checkpoint should be set when writing the stream somewhere? I was wondering how does that actually work, because i'm not only writing the stream to a table, but first I join the stream with other data and then i write to HBase, so I was wondering how does the checkpoint on the writeStream workGimmi

1 Answers

2
votes

Trigger

"Now I would like to schedule this code to run periodically, e.g. every 15 minutes, and I'm struggling understanding how to use checkpoints here.

In case you want your job to be triggered every 15 minutes, you can make use of Triggers.

You do not need to "use" checkpointing specifically, but just provide a reliable (e.g. HDFS) checkpoint location, see below.

Checkpointing

At every run of this code, I would like to read from the stream only the events I haven't read yet in the previous run [...]"

When reading data from Kafka in a Spark Structured Streaming application it is best to have the checkpoint location set directly in your StreamingQuery. Spark uses this location to create checkpoint files that keep track of your application's state and also record the offsets already read from Kafka.

When restarting the application it will check these checkpoint files to understand from where to continue to read from Kafka so it does not skip or miss any message. You do not need to set the startingOffset manually.

It is important to keep in mind that only specific changes in your application's code are allowed such that the checkpoint files can be used for secure re-starts. A good overview can be found in the Structured Streaming Programming Guide on Recovery Semantics after Changes in a Streaming Query.


Overall, for productive Spark Structured Streaming applications reading from Kafka I recommend the following structure:

val spark = SparkSession.builder().[...].getOrCreate()

val streamOfData = spark.readStream 
  .format("kafka") 
// option startingOffsets is only relevant for the very first time this application is running. After that, checkpoint files are being used.
  .option("startingOffsets", startingOffsets) 
  .option("otherOptions", otherOptions)
  .load()

// perform any kind of transformations on streaming DataFrames
val processedStreamOfData = streamOfData.[...]


val streamingQuery = processedStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  )
  .option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/"
  .trigger(Trigger.ProcessingTime("15 minutes"))
  .start()

streamingQuery.awaitTermination()