With paying some perf overhead, should be doable using query listener + broadcast variable and filter func. Some thing like:
class WaterMark extends Serializable {
var ws: Long = 0L;
def set(value: Long) : Unit = {
ws = value
}
def get(): Long = { ws}
}
var currentWs = spark.sparkContext.broadcast[WaterMark](new WaterMark)
df.filter(row => {
if(row.get("timestamp") < currentWs.value.ws){
//this will be filtered by watermark. we can persist it using custom method}
.........................
//Not filtering the row as that would be done by watermarking
true
})
..............................
class QueryListener (currentWs: Broadcast[WaterMark]) extends StreamingQueryListener {
import java.util.Locale
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
format.setTimeZone(TimeZone.getTimeZone("UTC"))
...........................
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
//un-persist the broadcast var so that it can be updated with next batch watermark
currentWs.unpersist(true)
currentWs.value.set(format.parse(event.progress.eventTime.get("watermark")).getTime)
println("Listener: " + currentWs.value.ws)
}
......................
}
NOTE: I have not tried it end-to-end myself and it does not handle the case when we restart the query due to failure OR code change (checkpoint_dir/commit folder to rescue??)