I have a stream with some keys and I want to store some state for each key. My stream looks something like this:
inputStream
.keyBy(new MyKeySelector())
.process(new MyKeyedProcessFunction());
Within the KeyedProcessFunction I have a state variable:
public class MyKeyedProcessFunction extends KeyedProcessFunction<...> {
private MapState<String, ...> state;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, ...> descriptor = new MapStateDescriptor<>(
"keyed",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<...>() {
}));
state = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement(... event, Context context, Collector<...> out) throws Exception {
String key = context.getCurrentKey();
... keyedState;
if (state.contains(key)) {
keyedState = state.get(key);
} else {
keyedState = new ...();
}
...
}
...
}
As you can see above I've created the state variable as a map, with the keys matching the keys in the keyBy() so that I can store different state for each key. This has got to be wrong, but I can't work out how I should store state per key. I feel that my state would not work correctly if Flink decided to send different keys to different sub tasks (rebalancing???).
I still so new to this, what am I doing wrong?