0
votes

I have a stream with some keys and I want to store some state for each key. My stream looks something like this:

inputStream
    .keyBy(new MyKeySelector())
    .process(new MyKeyedProcessFunction());

Within the KeyedProcessFunction I have a state variable:

public class MyKeyedProcessFunction extends KeyedProcessFunction<...> {

    private MapState<String, ...> state;

    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, ...> descriptor = new MapStateDescriptor<>(
            "keyed",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<...>() {
            }));

        state = getRuntimeContext().getMapState(descriptor);
    }

    @Override
    public void processElement(... event, Context context, Collector<...> out) throws Exception {

        String key = context.getCurrentKey();

        ... keyedState;
        if (state.contains(key)) {
            keyedState = state.get(key);
        } else {
            keyedState = new ...();
        }
        ...
    }

...
}

As you can see above I've created the state variable as a map, with the keys matching the keys in the keyBy() so that I can store different state for each key. This has got to be wrong, but I can't work out how I should store state per key. I feel that my state would not work correctly if Flink decided to send different keys to different sub tasks (rebalancing???).

I still so new to this, what am I doing wrong?

1

1 Answers

0
votes

This is covered pretty well in the tutorials that are part of the Flink documentation, and in greater depth in the free Flink training materials shared by Ververica. Both of these courses leverage the exercises in https://github.com/apache/flink-training, which should make all of this much clearer.

If your objective is to store a single object per key, then all you need for that is ValueState<T>, which will create a sharded hash map spread across the cluster, storing an object of type T for each distinct key. MapState is used when you need to store a hashmap per key -- e.g., if you wanted to have an open-ended attributes hash for every user, given a stream keyed by userId.

An instance of a KeyedProcessFunction is multiplexed across all of the keys belonging to the keygroups assigned to that instance of the operator. In the open method of a KeyedProcessFunction, there is no key in context; when you instantiate a ValueState object there, you are getting back a handle to a hashmap that is either in memory, or on the local disk, depending on which state backend you are using. Then when you call state.value() or state.update(...) in the processElement() method, there is a specific event in context, and the key that current event is implicitly used to read or write the appropriate entry in the state hashmap.

This interface is designed to protect you from trying to manipulate state for keys that cannot be accessed within the local instance (since the state is sharded across the cluster, there's no guarantee that state for any other key than the one for the current event is available in that instance). But it is admittedly confusing that the key is playing such an important role without being visible in any way in the API.