0
votes

I was trying to assign timestamp and watermarks to a stream by implementing the AssignerWithPeriodicWatermarks, inside the function, it implements:

override def getCurrentWatermark: Watermark = {
    // this guarantees that the watermark never goes backwards.
    val potentialWM = currentMaxTimestamp - maxOutOfOrderness
    if (potentialWM >= lastEmittedWatermark) lastEmittedWatermark = potentialWM

    new Watermark(lastEmittedWatermark)
  }

  override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
    val timestamp = element.streamTime // something exists in the stream
    if (timestamp > currentMaxTimestamp) currentMaxTimestamp = timestamp
    timestamp
  }

However, I still got watermarks of the default value -9223372036854775808, and when I tried to add printing inside both functions, I found only println inside extractTimestamp was printed, which is saying function of getCurrentWatermark was never called.

The implementations seem to be right, because the same code was able to run on another script(some code not written by me).

PS: It's not the first time that I encountered negative watermark, what I found is that after a certain period of time, the watermark will go positive, however I am still quite confused what happened at the beginning.

1

1 Answers

0
votes

The issue is that You are using AssignerWithPeriodicWatermark which does not generate watermarks per event but in intervals. Whenever You are using AssingerWithPeriodicWatermark You should set call the setTheAutowatermarkInterval on the execution environment. The value that You provide there will be the interval that the getCurrentWatermark will be called with. If You haven't set it then the method will never be called, thus You will never have a watermark changed. For testing and learning, You can consider using AssignerWithPunctuatedWatermark as this will simply emit watermark for each event.

EDIT: As it was mentioned below this answer, the default value for autowatermarkInterval is actually 200 ms. Also, using the AssignerWithPunctuatedWatermark doesn't mean You need to emit the Watermark for each event, but the method for emitting them will be called for each event. If You don't want to emit the Watermark then the method should simply return null.