2
votes

Watermarking enables automatic dropping of old state data in Apache Spark Structured Streaming. In structured-streaming-programming-guide.md, word count example demonstrates how watermarking can easily drop the records or events that arrive late in the system. ( https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md )

words.withWatermark("timestamp", "10 minutes")

Is there a way to save the records that are dropped or discarded by watermarking on a disk or in a table?

2
Looking in the manuals states no such possibility.thebluephantom
There should have been a method or function to store the dropped records on disk for review, analysis or audit purposes. We can not just blindly ignore the late events.roy bapy
Not sure I agree, maybe Flink has that, it has some extensions.thebluephantom
no is also an answer.thebluephantom

2 Answers

1
votes

Yes,spark doesn't have the function to trace these records.But flink does it !

0
votes

With paying some perf overhead, should be doable using query listener + broadcast variable and filter func. Some thing like:

class WaterMark extends Serializable {

 var ws: Long = 0L;
 def set(value: Long) : Unit = {
   ws = value
 }
def get(): Long = { ws}
}

var currentWs = spark.sparkContext.broadcast[WaterMark](new WaterMark) 
 
 df.filter(row => {
    if(row.get("timestamp") < currentWs.value.ws){
   //this will be filtered by watermark. we can persist it using custom method}
   .........................
  
  //Not filtering the row as that would be done by watermarking 
  true
  })   
..............................

class QueryListener (currentWs: Broadcast[WaterMark]) extends StreamingQueryListener {

import java.util.Locale

val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
format.setTimeZone(TimeZone.getTimeZone("UTC"))
...........................
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {

//un-persist the broadcast var so that it can be updated with next batch watermark
currentWs.unpersist(true)
currentWs.value.set(format.parse(event.progress.eventTime.get("watermark")).getTime)
  println("Listener: " + currentWs.value.ws)

 }
......................
}

NOTE: I have not tried it end-to-end myself and it does not handle the case when we restart the query due to failure OR code change (checkpoint_dir/commit folder to rescue??)