1
votes

I have keyed events coming in on a stream that I would like to accumulate by key, up to a timeout (say, 5 minutes), and then process the events accumulated up to that point (and ignore everything after for that key, but first things first).

I am new to Flink, but conceptually I think I need something like the code below.

    DataStream<Tuple2<String, String>> dataStream = see
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .window(GlobalWindows.create())
            .trigger(ProcessingTimeTrigger.create()) // how do I set the timeout value?
            .fold(new Tuple2<>("", ""), new FoldFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                public Tuple2<String, String> fold(Tuple2<String, String> agg, Tuple2<String, String> elem) {
                    if ( agg.f0.isEmpty()) {
                        agg.f0 = elem.f0;
                    }
                    if ( agg.f1.isEmpty()) {
                        agg.f1 = elem.f1;
                    } else {
                        agg.f1 = agg.f1 + "; " + elem.f1;
                    }
                    return agg;
                }
            });

This code does NOT compile because a ProcessingTimeTrigger needs a TimeWindow, and GlobalWindow is not a TimeWindow. So...

How can I accomplish keyed window timeouts in Flink?

1
Seems like you could just use TimeWindows. Is not a good solution for some reason?David Anderson
Hmm. My use case isn't a Tumbling, Sliding or Session Window. It's like a session window in that it starts with a keyed event, but the trigger is a fixed time from the start, not waiting for a gap in events. Sorry if this is a newbie question (I just started with Flink yesterday). Can I use a plain TimeWindow for this? And if so, can you give me an example?kinder
Looking at ProcessingTimeSessionWindows, could I use a version of this that does not reset the trigger when a new event comes in?kinder

1 Answers

1
votes

You will have a much easier time if you approach this with a KeyedProcessFunction.

I suggest establishing an item of keyed ListState in the open() method of a KeyedProcessFunction. In the processElement() method, if the list is empty, set a processing-time timer (a per-key timer, relative to the current time) to fire when you want the window to end. Then append the incoming event to the list.

When the timer fires the onTimer() method will be called, and you can iterate over the list, produce a result, and clear the list.

To arrange for only doing all of this only once per key, add a ValueState<Boolean> to the KeyedProcessFunction to keep track of this. (Note that if your key space is unbounded, you should think about a strategy for eventually expiring the state for stale keys.)

The documentation describes how to use Process Functions and how to work with state. You can find additional examples in the Flink training site, such as this exercise.