0
votes

Currently, I am using Flink to conduct research on stream processing engines. For my study, I work with historical streams, which consist of tuples of the following form:

event_time, attribute_1, ..., attribute_X

where event_time is used as TimeCharacteristic.EventTime during processing. Furthermore, I push my datasets into the processing topology, by either: (i) creating in-memory structures, or (ii) by reading the CSV files themselves.

Unfortunately, I have noticed that even if enough tuples have arrived in a window operator that complete a full window, that window is not pushed downstream for processing. As a result, the performance significantly drops and many times I have an OutOfMemoryError exception (with large historical streams).

To illustrate a typical use-case, I present the following example:

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
    l.add(new Tuple2<>(1L, 11));
    l.add(new Tuple2<>(2L, 22));
    l.add(new Tuple2<>(3L, 33));
    l.add(new Tuple2<>(4L, 44));
    l.add(new Tuple2<>(5L, 55));
    DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
    stream.assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
                return t.f0;
            }
        })
        .windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2), 
                Time.milliseconds(1)))
        .sum(1)
        .print();
    env.execute();

According to l's contents, I need to have the following windowed results:

  • [0, 2) Sum: 11
  • [1, 3) Sum: 33
  • [2, 4) Sum: 55
  • [3, 5) Sum: 77
  • [4, 6) Sum: 99
  • [5, 7) Sum: 55

Each list item can be read as [start-timestamp, end-timestamp), Sum: X.

I expect Flink to produce a windowed result every time a tuple with a timestamp beyond the end-timestamp of an open window appears. For instance, I expect the summation for window [1, 3) to be produced when the tuple with timestamp 4L is fed into the window operator. However, the processing initiates when all the tuples from l are pushed into the stream's topology. The same thing happens when I work with larger historical streams, which results in degraded performance (or even depleting memory).

Question: How can I force Flink to push windows downstream for processing by the time a window is complete?

I believe that for SlidingEventTimeWindows the eviction of a window is triggered with watermarks. If the previous is true, how can I write my topologies so that they trigger windows by the time a tuple with a later timestamp arrives?

Thank you

1
Is your source actually a stream, or are you always reading it into memory first? In your example, the full input is going to be read in fromCollectionJoshua DeWald
Usually it's read from a CSV file. The example I posted is to present an easy to follow example of what is happening inside Flink.nick.katsip

1 Answers

1
votes

AscendingTimestampExtractor uses the periodic watermarking strategy, in which Flink will call the getCurrentWatermark() method every n milliseconds, where n is the autowatermarkinterval.

The default interval is 200 milliseconds, which is very long compared to the size of your windows. However, they aren't directly comparable -- the 200 msec is measured in processing time, not event time. Nevertheless, I suspect that if you haven't changed this configuration setting, then a lot of windows are created before the first watermark is emitted, which I think explains what you are seeing.

You could reduce the auto-watermarking interval (perhaps to 1 millisecond). Or you could implement an AssignerWithPunctuatedWatermarks, which will give you more control.