3
votes

I am somewhat confused by how Flink deals with late elements when watermarking on event time.

My understanding is that as Flink reads a stream of data, the watermark time is progressed upon seeing any data which has a larger event time than that of the current watermark. Then, any windows which cover a time strictly less than the watermark are triggered for eviction (assuming not late allowance.

However, take this minimal example:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}

object EventTimeExample {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  case class ExampleType(time: Long, value: Long)

  def main(args: Array[String]) {

    // Set up environment
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Example S3 path
    val simple = env.fromCollection(Seq(
      ExampleType(1525132800000L, 1),
      ExampleType(1525132800000L, 2) ,
      ExampleType(1525132920000L, 3),
      ExampleType(1525132800000L, 4)
    ))
      .assignAscendingTimestamps(_.time)

    val windows = simple
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply{
       (window, iter, collector: Collector[(Long, Long, String)]) => {
        collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
      }
    }

    windows.print
    env.execute("TimeStampExample")
  }
}

The result of running this is:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

However, if my understanding is correct, the 4 should not be included in the first window here, as the watermark time should be updated when the value 3 record is reached.

Now I recognise this is a trivial example, but not understanding this is making it hard to understand more complicated flows.

2

2 Answers

8
votes

Your understanding is basically correct, but there are a few more things going on here that need to be taken into account.

First of all, you've used assignAscendingTimestamps(), which can only be used when the event stream is perfectly in order (by timestamp), which isn't the case here. You should see this warning when you run this application:

WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1525132800000 < 1525132920000

The other factor at work here is that an AscendingTimestampExtractor does not update the current Watermark for every passing stream element. This is an example of a periodic watermark generator, and it will inject a Watermark into the stream every n milliseconds, where n is defined by ExecutionConfig.setAutoWatermarkInterval(...), which defaults to 200 msec. This is how event #4 sneaks into the first window.

To get the results you expect, you could implement a punctuated watermark generator configured to generate a watermark for every event:

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
  override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
    element.time
  }

  override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp)
  }
}

which you would then use like this:

val simple = env.fromCollection(Seq(
  ExampleType(1525132800000L, 1),
  ExampleType(1525132800000L, 2) ,
  ExampleType(1525132920000L, 3),
  ExampleType(1525132800000L, 4)
))
  .assignTimestampsAndWatermarks(new PunctuatedAssigner)

Now your example produces these results:

(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))

Event #4 has been dropped because it is late. This could be adjusted by relaxing the watermark generator so as to accommodate some amount of out-of-orderness. E.g.,

override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp - 200000)
}

which then produces these results:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Or you could configure the windows to allow late events

val windows = simple
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
  .allowedLateness(Time.seconds(200))
  ...

which then causes the first window to fire twice:

(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Note that since processing Watermarks imposes some overhead, you wouldn't normally want to use punctuated watermarks in this way (with a Watermark for every event). For most applications, periodic watermarks based on a BoundedOutOfOrdernessTimestampExtractor is a better choice.

0
votes

IF BoundedOutOfOrdernessTimestampExtractor is used the last calculation is not outputted till a new event comes. If we use SystemTime in the watermark it works but when you re-run for messages with embedded timestamp (past events) it does not calculate for those.