0
votes

I am reading the Flink source code for how to calculate the state position in the array for a Key, and found the state position calculated by keyGroupIndex-keyGroupOffset,
my questions are:

  1. Why use the keyGroupIndex-keyGroupOffset as the position, why not use the state[keyGroupIndex] directly?

    Also I found state array assigned with the size Number Of KeyGroups by the statement Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()]; , if using state[keyGroupIndex] directly , it should also be one to one mapping.

  2. Why we need the KeyGroupRange?

Below code extracted from the source code NestedMapsStateTable.java

this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();

@VisibleForTesting
Map<N, Map<K, S>> getMapForKeyGroup(int keyGroupIndex) {
    final int pos = indexToOffset(keyGroupIndex);
    if (pos >= 0 && pos < state.length) {
        return state[pos];
    } else {
        return null;
    }
}

private int indexToOffset(int index) {
    return index - keyGroupOffset;
}

public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
    super(keyContext, metaInfo);
    this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();

    @SuppressWarnings("unchecked")
    Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()];
    this.state = state;
}

https://github.com/apache/flink/blob/63c04a516f40ec2dca4d8edef58e7c2ef563ce67/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java

1

1 Answers

0
votes

The idea is that every StateBackend is responsible for a sub set of the complete key group range. Therefore, we only have to store a state map for each key group in our range. In order to do the state map management we normalize the key group indices such that they start with 0.

However, there is a small error in the code which allocates a state map entry for each key group in the complete range. This should be fixed. Here is the corresponding JIRA issue.