10
votes

I'm running into some troubles understanding the semantics around event time windowing. The following program generates some tuples with timestamps that are used as event time and does a simple window aggregation. I would expect the output to be in the same order as the input, but the output is ordered differently. Why is the output out of order with respect to event time?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

The input:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

Result:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
1

1 Answers

12
votes

The reason for this behavior is that in Flink the ordering of elements (with respect to the timestamp) is not taken into account. Only the correctness of watermarks and their relation to the timestamps of elements is important for operations that consider time because the watermarks normally trigger computation in time-based operations.

In your example, the window operator stores all the elements from the source in internal window buffers. Then, the source emits a watermark that says that no elements with a smaller timestamp will arrive in the future. This, in turn, tells the window operator to process all windows with end-timestamps that are below the watermarks (which is true for all windows). Thus, it emits all the windows (with arbitrary ordering) and after that it emits a watermark itself. Operations downstream from this will themselves receive the elements and can do processing once they receive watermarks.

Per default, the interval at which watermarks are emitted from sources is 200 ms. With the small amount of elements that your source emits all of them are emitted before the first watermark is emitted. In a real-world use case, where the watermark emission intervals is a lot smaller than then window size you would get the expected behavior of windows being emitted in the order of their timestamp. For example, if you have 1 hour windows and watermarks every 500 ms.