I was trying to assign timestamp and watermarks to a stream by implementing the AssignerWithPeriodicWatermarks
, inside the function, it implements:
override def getCurrentWatermark: Watermark = {
// this guarantees that the watermark never goes backwards.
val potentialWM = currentMaxTimestamp - maxOutOfOrderness
if (potentialWM >= lastEmittedWatermark) lastEmittedWatermark = potentialWM
new Watermark(lastEmittedWatermark)
}
override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
val timestamp = element.streamTime // something exists in the stream
if (timestamp > currentMaxTimestamp) currentMaxTimestamp = timestamp
timestamp
}
However, I still got watermarks of the default value -9223372036854775808
, and when I tried to add printing inside both functions, I found only println
inside extractTimestamp
was printed, which is saying function of getCurrentWatermark
was never called.
The implementations seem to be right, because the same code was able to run on another script(some code not written by me).
PS: It's not the first time that I encountered negative watermark, what I found is that after a certain period of time, the watermark will go positive, however I am still quite confused what happened at the beginning.