2
votes

In Structured Streaming, will the checkpoints keep track of which data has already been processed from a Delta Table?

def fetch_data_streaming(source_table: str):
  print("Fetching now")
  streamingInputDF = (
    spark
      .readStream
      .format("delta")
      .option("maxBytesPerTrigger",1024)
      .table(source_table)
      .where("measurementId IN (1351,1350)")
      .where("year >= '2021'")
    )

  query = (
    streamingInputDF
      .writeStream
      .outputMode("append")
      .option("checkpointLocation", "/streaming_checkpoints/5")
      .foreachBatch(customWriter)
      .start()
      .awaitTermination()
  )
  
return query

def customWriter(batchDF,batchId):
  print(batchId)
  print(batchDF.count())
  batchDF.show(10)
  length = batchDF.count()
  print("batchId,batch size:",batchId,length)

If I change the where clause in the streamingInputDF to add more measurentId, the structured streaming job doesn't always acknowledge the change and fetch the new data values. It continues to run as if nothing has changed, whereas at times it starts fetching new values.

Isn't the checkpoint supposed to identify the change?

Edit: Schema of delta table:

col_name data_type
measurementId int
year int
time timestamp
q smallint
v string
1
what kind of changes you mean about "aknowledge the change" - Alex Ott
I would expect it to start fetching the data corresponding to new ids - rpr

1 Answers

1
votes

"In structured streaming, will the checkpoints will keep track of which data has already been processed?"

Yes, the Structured Streaming job will store the read version of the Delta table in its checkpoint files to avoid producing duplicates.

Within the checkpoint directory in the folder "offsets", you will see that Spark stored the progress per batchId. For example it will look like below:

v1
{"batchWatermarkMs":0,"batchTimestampMs":1619695775288,"conf":[...]}
{"sourceVersion":1,"reservoirId":"d910a260-6aa2-4a7c-9f5c-1be3164127c0","reservoirVersion":2,"index":2,"isStartingVersion":true}

Here, the important part is the "reservoirVersion":2 which tells you that the streaming job has consumed all data from the Delta Table as of version 2.

Re-starting your Structured Streaming query with an additional filter condition will therefore not be applied to historic records but only to those that were added to the Delta Table after version 2.


In order to see this behavior in action you can use below code and analyse the content in the checkpoint files.

val deltaPath = "file:///tmp/delta/table"
val checkpointLocation = "file:///tmp/checkpoint/"


// run the following two lines once
val deltaDf = Seq(("1", "foo1"), ("2", "foo2"), ("3", "foo2")).toDF("id", "value")
deltaDf.write.format("delta").mode("append").save(deltaPath)


// run this code for the first time, then add filter condition, then run again
val query = spark.readStream
  .format("delta")
  .load(deltaPath)
  .filter(col("id").isin("1")) // in the second run add "2"
  .writeStream
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", checkpointLocation)
  .start()

query.awaitTermination()

Now, if you append some more data to the Delta table while the streaming query is shut down and then restart is with the new filter condition it will be applied to the new data.