I'm trying to apply a very simple window function to a finite data stream in Apache Flink (locally, no cluster). Here's the example:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
Here, I try to group all the elements that arrive into the window within a second and then just print these groups.
I assumed that all the elements would be produces in much less than one second and get into one window, so there will be one incoming element in print()
. However, nothing is printed at all when I run this.
If I remove all the windowing stuff, like
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
I see the elements printed after the run. I also tried this with the file source, no difference.
The default parallelism on my machine is 6. If I experiment with the level of parallelism and delays, like this
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
I able to get some--not all--elements into groups, which are printed.
My first assumption is that the source finishes much faster than 1 second and the task is shut down before the window's timer fires. Debugging showed that the timer setting line in ProcessingTimeTrigger
is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?
Could you please help me understand this and make this more deterministic?
Update #1, 23.09.2018:
I also experimented with event time windows instead of processing time windows. If I do this:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
override def extractAscendingTimestamp(element: String): Long = {
element.charAt(0).toInt
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.trigger(EventTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.toString())
}
})
.print()
env.execute()
Then again nothing is printed. The debugger shows that the trigger's onElement
is called for every element, but onEventTime
is never called.
Also, if I modify the the timestamp extractor to make bigger steps:
element.charAt(0).toInt * 1000
all the elements are printed (one element per group, which is expected), apart from the last.
Update #2, 23.09.2018:
Update #1 is answered in this comment.