1
votes

Here is what I want to do in Apache Flink:

Take an input DataStream<T> then Key By field x and then do a sliding 15 minute window which slides every minute, aggregate the result for each of the keys (x) and then aggregate all of those aggregations into a list

Basically if I have an input stream, [(a, 1, Time 1), (b, 6, Time 14), (b, 1, Time 12)], I want the result to be [(a, 1), (b, 7)], by operating on 15 minute sliding windows and for this specific sliding window.

Is this possible?

1

1 Answers

2
votes

Yes indeed this is possible. Flink's window API allows you to follow a keyed window with a non-keyed window. This exercise from the Apache Flink training site covers this pattern. Also, look for the section entitled "Windows Can Follow Windows" in the list of "surprises" about windows on this page in the documentation.

Roughly, you'll be doing this:

stream
    .keyBy(e -> e.x)
    .timeWindow(Time.minutes(15), Time.minutes(1))
    .process(new ProduceKeyedResults())
    .timeWindowAll(Time.minutes(15), Time.minutes(1))
    .process(new ProduceOverallResults())

You might rather use a ReduceFunction or an AggregateFunction instead of or in addition to the WindowProcessFunctions.

You'll notice that the events produced by a timewindow operator have timestamps that reflect the time boundary of the window itself, rather than having something to do with the timestamps on the events that fell into the window. But the events do have timestamps, and the stream is still watermarked -- so it wouldn't make sense to do timestamp assignment again. (Also worth noting is that the stream produced by a keyed window is no longer keyed.)