0
votes

I have a kafka messages something like the following pattern:

{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}

With Flink stream calculation that do some count action on those items .

Now I want to declare a session , to collect same user + value in a range of X seconds as a single , with the latest timestamp , then it will be forwarded to the next stream just one time

So I Wrote something like that:

data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
        .....
    })
    .keyBy(new KeySelector<Data, String>(){

        .......
    })
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .aggregate(new AggregateFunction<Data, Data, Data>() {

        @Override
        public Data createAccumulator() {
            return null;
        }

        @Override
        public Data add(Data value, Data accumulator) {
            if(accumulator == null) {
                accumulator = value;
            }
            return accumulator;

        }

        @Override
        public Data getResult(Data accumulator) {
            return accumulator;
        }

        @Override
        public Data merge(Data a, Data b) {
            return a;
        }
   });

But the problem is that the getResult function is called on each element , not just in the end of the window.

My problem is how to not to forward the aggregation result until the end of the window to the next stream. as far that I know also process stream result is moving forward when there is no more elements, even though the windows isn't end yes

any advice?

Thanks

1

1 Answers

1
votes

Flink provides two distinct approaches for evaluating windows. In this case you want to use the other one.

One approach evaluates each window's contents incrementally. This is what you get with reduce and aggregate. As elements are assigned to the window, the ReduceFunction or AggregateFunction is called and that element immediately makes its contribution to the final result.

The alternative is to use process with a ProcessWindowFunction. With this approach, the window isn't evaluated until the window is complete, at which point the ProcessWindowFunction is called once with an Iterable containing all of the elements that were assigned to the window. This has the disadvantage of needing to store all of the elements until the window is triggered, and if the ProcessWindowFunction has to do a lot of work to compute its result that can temporarily disrupt the pipeline, but some calculations need to be done this way -- like counting distinct elements.

See the documentation for more info.