I have keyed events coming in on a stream that I would like to accumulate by key, up to a timeout (say, 5 minutes), and then process the events accumulated up to that point (and ignore everything after for that key, but first things first).
I am new to Flink, but conceptually I think I need something like the code below.
DataStream<Tuple2<String, String>> dataStream = see
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.window(GlobalWindows.create())
.trigger(ProcessingTimeTrigger.create()) // how do I set the timeout value?
.fold(new Tuple2<>("", ""), new FoldFunction<Tuple2<String, String>, Tuple2<String, String>>() {
public Tuple2<String, String> fold(Tuple2<String, String> agg, Tuple2<String, String> elem) {
if ( agg.f0.isEmpty()) {
agg.f0 = elem.f0;
}
if ( agg.f1.isEmpty()) {
agg.f1 = elem.f1;
} else {
agg.f1 = agg.f1 + "; " + elem.f1;
}
return agg;
}
});
This code does NOT compile because a ProcessingTimeTrigger needs a TimeWindow, and GlobalWindow is not a TimeWindow. So...
How can I accomplish keyed window timeouts in Flink?