I have a kafka messages something like the following pattern:
{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}
With Flink stream calculation that do some count action on those items .
Now I want to declare a session , to collect same user + value in a range of X seconds as a single , with the latest timestamp , then it will be forwarded to the next stream just one time
So I Wrote something like that:
data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
.....
})
.keyBy(new KeySelector<Data, String>(){
.......
})
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.aggregate(new AggregateFunction<Data, Data, Data>() {
@Override
public Data createAccumulator() {
return null;
}
@Override
public Data add(Data value, Data accumulator) {
if(accumulator == null) {
accumulator = value;
}
return accumulator;
}
@Override
public Data getResult(Data accumulator) {
return accumulator;
}
@Override
public Data merge(Data a, Data b) {
return a;
}
});
But the problem is that the getResult function is called on each element , not just in the end of the window.
My problem is how to not to forward the aggregation result until the end of the window to the next stream. as far that I know also process stream result is moving forward when there is no more elements, even though the windows isn't end yes
any advice?
Thanks