Currently, I am using Flink to conduct research on stream processing engines. For my study, I work with historical streams, which consist of tuples of the following form:
event_time, attribute_1, ..., attribute_X
where event_time
is used as TimeCharacteristic.EventTime
during processing. Furthermore, I push my datasets into the processing topology, by either: (i) creating in-memory structures, or (ii) by reading the CSV files themselves.
Unfortunately, I have noticed that even if enough tuples have arrived in a window operator that complete a full window, that window is not pushed downstream for processing. As a result, the performance significantly drops and many times I have an OutOfMemoryError
exception (with large historical streams).
To illustrate a typical use-case, I present the following example:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();
According to l
's contents, I need to have the following windowed results:
- [0, 2) Sum: 11
- [1, 3) Sum: 33
- [2, 4) Sum: 55
- [3, 5) Sum: 77
- [4, 6) Sum: 99
- [5, 7) Sum: 55
Each list item can be read as [start-timestamp, end-timestamp), Sum: X.
I expect Flink to produce a windowed result every time a tuple with a timestamp beyond the end-timestamp of an open window appears. For instance, I expect the summation for window [1, 3) to be produced when the tuple with timestamp 4L
is fed into the window operator. However, the processing initiates when all the tuples from l
are pushed into the stream's topology. The same thing happens when I work with larger historical streams, which results in degraded performance (or even depleting memory).
Question: How can I force Flink to push windows downstream for processing by the time a window is complete?
I believe that for SlidingEventTimeWindows
the eviction of a window is triggered with watermarks. If the previous is true, how can I write my topologies so that they trigger windows by the time a tuple with a later timestamp arrives?
Thank you