I have a ProcessWindowFunction for processing TumblingEventTimeWindows in which I use a state store to preserve some values across multiple tumbling windows.
My problem is that this state store is not being preserved across tumbling windows i.e. if I first store something in window [0,999] and then access this store from window [1000,1999], the store is empty.
I am aware of the global state and per window state stated here. I want to use global state. I also tried creating a minimum working example to investigate this:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class twStateStoreTest {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
final DataStream<Element> elements = env.fromElements(
Element.from(1, 500),
Element.from(1, 1000),
Element.from(1, 1500),
Element.from(1, 2000),
Element.from(99, 9999)
).
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
long w;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(w);
}
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
w = element.getTimestamp();
return w;
}
});
elements
.keyBy(new KeySelector<Element, Integer>() {
@Override
public Integer getKey(Element element) throws Exception {
return element.value;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
.process(new MyProcessWindowFn()).
print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
MapState<Integer, Integer> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
}
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
if (stateStore.get(key) == null) {
stateStore.put(key, 1);
}else {
int previous = stateStore.get(key);
stateStore.put(key, previous+1);
}
out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
+ " for window : " + context.window());
}
}
static class Element {
private final long timestamp;
private final int value;
public Element(long timestamp, int value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public int getValue() {
return value;
}
public static Element from(int value, long timestamp) {
return new Element(timestamp, value);
}
}
}
Here I'm trying to count the number of times the process() function was called for a key. This example works and the state is indeed stored across tumbling windows. I've ensured that this example exactly mirrors the actual processWindow function, with other unnecessary code stripped off.
But state is not preserved across windows in the actual processWindowFunction!
Is there any gotcha that I am clearly missing to account for? Is there any other reason why state is not preserved across EventTimeTumblingWindows for a processWindowFunction that uses a MapState defined as follows:
private MapState<UserDefinedEnum, Boolean> activeSessionStore;
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
Here's the actual class with bloat removed and as per @David's and @ShemTov's suggestions:
public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {
private final static MapStateDescriptor<IUEventType, Boolean> desc = new MapStateDescriptor<IUEventType, Boolean>(
"store", IUEventType.class, Boolean.class);
private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
// even though I populated activeSessionStore with some values in the previous invocation of process()
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}
@Override
public void clear(Context context) throws Exception {
context.globalState().getMapState(desc).clear();
}
}
And I invoke it using :
inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
process(new IUFeatureStateCombiner())
This still has the problem, I get an empty iterable in the second invocation of process() even though I populated the state in the previous invocation.
Edit: Problem solved, the clear() method should not be invoked since this is a global state.