0
votes

I am implementing a AggregateFunction to measure the duration between two events after .window(EventTimeSessionWindows.withGap(gap)) . After the second event is processed, the window is closed.

Will flink automatically checkpoint the state of the AggregateFunction so that existing data in the accumulator is not lost from restarting?

Since I am not sure about that. I tried to implement AggregatingState in a RichAggregateFunction: class MyAgg extends RichAggregateFunction<IN, ACC, OUT>

AggregatingState requires AggregatingStateDescriptor. Its constructor has this signature:

            String name,
            AggregateFunction<IN, ACC, OUT> aggFunction,
            Class<ACC> stateType) {

I am very confused by the aggFunction. What should be put here? Isn't it the MyAgg that I am trying to define in the first place?

1
In what context are you using this AggregateFunction? Is this on a streaming window, or somewhere else? - David Anderson
@davidanderson it is a streaming window. - GeauxEric

1 Answers

1
votes

An AggregateFunction doesn't have any state. But the aggregating state used in a streaming window (and manipulated by an AggregateFunction) is checkpointed as part of the window's state.

A RichAggregateFunction cannot be used in a window context, and an AggregateFunction cannot have its own state. It's designed this way because if an AggregateFunction were allowed to use a state descriptor to define ValueState, for example, then that state wouldn't be mergeable -- and to keep the Window API reasonably clean, all window state needs to be mergeable (for the sake of session windows).

AggregatingState is something you might use in a KeyedProcessFunction, for example. In that context, you need to define how elements are to be aggregated into the accumulator (i.e., the AggregatingState), which you do with an AggregateFunction.