0
votes

In the following simple Flink code I have 3 events with time events, with 1 second between each two. They are served to Flink unordered: events: 2, 1, 3. I noticed that when I change the argument for timeWindowAll I get sometimes events all 3 events printed, and sometimes only 2 and 3.

Some examples:

.timeWindowAll(Time.seconds(3)) --> 2, 3
.timeWindowAll(Time.seconds(4)) --> 2, 1, 3
.timeWindowAll(Time.seconds(5)) --> 2, 3
.timeWindowAll(Time.seconds(6)) --> 2, 3
.timeWindowAll(Time.seconds(7)) --> 2, 1, 3
.timeWindowAll(Time.seconds(8)) --> 2, 1, 3
.timeWindowAll(Time.seconds(9)) --> 2, 1, 3
.timeWindowAll(Time.seconds(10)) --> 2, 3
...

Could someone explain why it happens this way? I guess it is related to the start time of the window and to that event 1 is late. So in that case, giving "size" to timeWindowAll, how do I know what will be the start time of each window?

object UnorederedTimeEvents {
  case class MyEvent(timestamp: Long, str: String)
  class MyAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = element.timestamp
  }

  class MyProcessAllWindowFunction extends ProcessAllWindowFunction[MyEvent, MyEvent, TimeWindow] {
    override def process(context: Context, elements: Iterable[MyEvent], out: Collector[MyEvent]): Unit = {
      elements.foreach(out.collect)
    }
  }

  def main(args: Array[String]): Unit = {

    val events = List(MyEvent(1526056650167L, "2"), MyEvent(1526056649167L, "1"), MyEvent(1526056651167L, "3"))

    println(events.sortBy(_.timestamp))

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    env
      .fromCollection(events)
      .assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks)
      .timeWindowAll(Time.seconds(10))
      .process(new MyProcessAllWindowFunction)
      .print()

    env.execute()

  }

}
1

1 Answers

2
votes

Converting your timestamps to human time, we see they are at these times (UTC):

2: Friday, May 11, 2018 4:37:30.167 PM
1: Friday, May 11, 2018 4:37:29.167 PM
3: Friday, May 11, 2018 4:37:31.167 PM

Now let's consider this case:

.timeWindowAll(Time.seconds(10)) --> 2, 3

Time windows are aligned to the clock and not the events. In this case there is one ten second window from 4:37:20 to 4:37:30, and another from 4:37:30 to 4:37:40. This is why event #1 is dropped. Event #2, which was processed first, put the watermark at 4:37:30.167, and with no allowed lateness, event #1 has nowhere to go. The window assigner just drops it.

On the other hand, in this case

.timeWindowAll(Time.seconds(4)) --> 2, 1, 3

with 4 second long windows, all three events fall into the window that starts at 4:37:28. Event #1 is still late, but the window it belongs to hasn't been purged yet (and won't be until the watermark reaches 4:37:32), so event #1 is included in the window.

For what it's worth, a typical strategy when working with unordered events is to adjust the watermarking to at least accommodate whatever amount of out-of-orderness is expected for your application (rather than none at all, as you've done here). And Flink's windowing API also has support for late events (meaning events so late that the watermarking delay wasn't sufficient).

Docs: Event Time and Watermarks, Lateness.