In Structured Streaming, will the checkpoints keep track of which data has already been processed from a Delta Table?
def fetch_data_streaming(source_table: str):
print("Fetching now")
streamingInputDF = (
spark
.readStream
.format("delta")
.option("maxBytesPerTrigger",1024)
.table(source_table)
.where("measurementId IN (1351,1350)")
.where("year >= '2021'")
)
query = (
streamingInputDF
.writeStream
.outputMode("append")
.option("checkpointLocation", "/streaming_checkpoints/5")
.foreachBatch(customWriter)
.start()
.awaitTermination()
)
return query
def customWriter(batchDF,batchId):
print(batchId)
print(batchDF.count())
batchDF.show(10)
length = batchDF.count()
print("batchId,batch size:",batchId,length)
If I change the where clause in the streamingInputDF to add more measurentId, the structured streaming job doesn't always acknowledge the change and fetch the new data values. It continues to run as if nothing has changed, whereas at times it starts fetching new values.
Isn't the checkpoint supposed to identify the change?
Edit: Schema of delta table:
| col_name | data_type |
|---|---|
| measurementId | int |
| year | int |
| time | timestamp |
| q | smallint |
| v | string |