In the following simple Flink code I have 3 events with time events, with 1 second between each two. They are served to Flink unordered: events: 2, 1, 3. I noticed that when I change the argument for timeWindowAll I get sometimes events all 3 events printed, and sometimes only 2 and 3.
Some examples:
.timeWindowAll(Time.seconds(3)) --> 2, 3
.timeWindowAll(Time.seconds(4)) --> 2, 1, 3
.timeWindowAll(Time.seconds(5)) --> 2, 3
.timeWindowAll(Time.seconds(6)) --> 2, 3
.timeWindowAll(Time.seconds(7)) --> 2, 1, 3
.timeWindowAll(Time.seconds(8)) --> 2, 1, 3
.timeWindowAll(Time.seconds(9)) --> 2, 1, 3
.timeWindowAll(Time.seconds(10)) --> 2, 3
...
Could someone explain why it happens this way? I guess it is related to the start time of the window and to that event 1 is late. So in that case, giving "size" to timeWindowAll, how do I know what will be the start time of each window?
object UnorederedTimeEvents {
case class MyEvent(timestamp: Long, str: String)
class MyAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = element.timestamp
}
class MyProcessAllWindowFunction extends ProcessAllWindowFunction[MyEvent, MyEvent, TimeWindow] {
override def process(context: Context, elements: Iterable[MyEvent], out: Collector[MyEvent]): Unit = {
elements.foreach(out.collect)
}
}
def main(args: Array[String]): Unit = {
val events = List(MyEvent(1526056650167L, "2"), MyEvent(1526056649167L, "1"), MyEvent(1526056651167L, "3"))
println(events.sortBy(_.timestamp))
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env
.fromCollection(events)
.assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks)
.timeWindowAll(Time.seconds(10))
.process(new MyProcessAllWindowFunction)
.print()
env.execute()
}
}