0
votes

I'm using the window API to divide the data into windows of 1 hour. In each window, I use a Value state to store a boolean for each window.

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.days(1)) {
    @Override
    public long extractTimestamp(Event element) {
        return element.timestamp;
    }
})

// Partition by user
.keyBy(new KeySelector<Event, Tuple2<Long, String>>() {
    @Override
    public Tuple2<Long, String> getKey(Event value) {
        return Tuple2.of(value.userGroup, value.userName);
    }
})

.window(TumblingEventTimeWindows.of(Time.minutes(60), Time.minutes(0)))
.allowedLateness(Time.days(1))
.trigger(new WindowTrigger<>(EVENTS_THRESHOLD))
.aggregate(new WindowAggregator(), new WindowProcessor())

.print();
public class WindowProcessor extends ProcessWindowFunction<Long, String, Tuple2<Long, String>, TimeWindow> {

    private final ValueStateDescriptor<Boolean> windowAlertedDescriptor = new ValueStateDescriptor<>("windowAlerted", Boolean.class);

    @Override
    public void process(Tuple2<Long, String> key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
        long currentDownloadsCount = elements.iterator().next();
        long windowStart = context.window().getStart();
        long windowEnd = context.window().getEnd();

        ValueState<Boolean> windowAlertedState = context.windowState().getState(windowAlertedDescriptor);
        if (BooleanUtils.isTrue(windowAlertedState.value())) {
            return;
        }

Do I have to call the "clear()" method to clean up the window state data? I assume that because Flink handles the window creation and purge it should handle the state clean up as well when it purges the window.

According to the answer here How to clear state immediately after a keyed window is processed? windows clear up their state automaticaly once the window has fired.

But Flink documentation explicitly metion that you should call the clear method to remove window state https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

1

1 Answers

1
votes

The various classes involved in the window API keep state in a number of places:

  • the list of stream records assigned to each Window
  • a Trigger can be stateful (e.g., a CountTrigger)
  • per-window state (in a ProcessWindowFunction.Context)
  • global state (also in a ProcessWindowFunction.Context)

The first two (the Window contents and Trigger state) are cleaned up automatically by Flink when the Window is purged. When purging a window, Flink also calls the clear method on your ProcessWindowFunction, and you should clear whatever per-window state you may have created in the KeyedStateStore windowState() at tha time.

On the other hand, the purpose of KeyedStateStore globalState() is to remember things from one window to another, so you won't be clearing that. However, if you have an unbounded key space, you should take care to clean up the global window state for stale keys. The only way to do this is by specifying state TTL on the state descriptor(s) for the global state.