I'm using the window API to divide the data into windows of 1 hour. In each window, I use a Value state to store a boolean for each window.
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.days(1)) {
@Override
public long extractTimestamp(Event element) {
return element.timestamp;
}
})
// Partition by user
.keyBy(new KeySelector<Event, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> getKey(Event value) {
return Tuple2.of(value.userGroup, value.userName);
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(60), Time.minutes(0)))
.allowedLateness(Time.days(1))
.trigger(new WindowTrigger<>(EVENTS_THRESHOLD))
.aggregate(new WindowAggregator(), new WindowProcessor())
.print();
public class WindowProcessor extends ProcessWindowFunction<Long, String, Tuple2<Long, String>, TimeWindow> {
private final ValueStateDescriptor<Boolean> windowAlertedDescriptor = new ValueStateDescriptor<>("windowAlerted", Boolean.class);
@Override
public void process(Tuple2<Long, String> key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
long currentDownloadsCount = elements.iterator().next();
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
ValueState<Boolean> windowAlertedState = context.windowState().getState(windowAlertedDescriptor);
if (BooleanUtils.isTrue(windowAlertedState.value())) {
return;
}
Do I have to call the "clear()" method to clean up the window state data? I assume that because Flink handles the window creation and purge it should handle the state clean up as well when it purges the window.
According to the answer here How to clear state immediately after a keyed window is processed? windows clear up their state automaticaly once the window has fired.
But Flink documentation explicitly metion that you should call the clear method to remove window state https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction