1
votes

I'm calculating a count (summing 1) over a timewindow as follows:

mappedUserTrackingEvent
            .keyBy("videoId", "userId")
            .timeWindow(Time.seconds(30))
            .sum("count")

I would like to actually add the window start time as a key field too. so the result would be something like:

key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50

So essentially aggregate count by window. End Goal is to draw a histogram of these windows.

How can I add the start of window as a field in the key? and following that align the window to 00s or 30s in this case? Is that possible?

2

2 Answers

4
votes

The apply() method of the WindowFunction provides a Window object, which is a TimeWindow if you use keyBy().timeWindow(). The TimeWindow object has two methods, getStart() and getEnd() which return the timestamp of the window's start and end, respectively.

At the moment it is not possible use the sum() aggregation together with a WindowFunction. You need to do something like:

 mappedUserTrackingEvent
        .keyBy("videoId", "userId")
        .timeWindow(Time.seconds(30))
        .apply(new MySumReduceFunction(), new MyWindowFunction());`

MySumReduceFunction implements the ReduceFunction interface and compute the sum by incrementally aggregating the elements that arrive in the window. The MyWindowFunction implements WindowFunction. It receives the aggregated value through the Iterable parameter and enriches the value with the timestamp obtained from the TimeWindow parameter.

2
votes

You can use the method aggregate instead of sum.
In aggregate set the secondly parameter implements WindowFunction or extends ProcessWindowFunction.
I am using the Flink-1.4.0 , recommend to use ProcessWindowFunction, like:

mappedUserTrackingEvent
    .keyBy("videoId", "userId")
    .timeWindow(Time.seconds(30))
    .aggregate(new Count(), new MyProcessWindowFunction();

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long,  Integer>, Tuple, TimeWindow>
{
    @Override
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long,  Integer>> collector) throws Exception
    {
        context.currentProcessingTime();
        context.window().getStart();
    }
}