I am reading the Flink source code for how to calculate the state position in the array for a Key, and found the state position calculated by keyGroupIndex-keyGroupOffset,
my questions are:
Why use the keyGroupIndex-keyGroupOffset as the position, why not use the state[keyGroupIndex] directly?
Also I found state array assigned with the size Number Of KeyGroups by the statement
Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()];
, if usingstate[keyGroupIndex]
directly , it should also be one to one mapping.Why we need the KeyGroupRange?
Below code extracted from the source code NestedMapsStateTable.java
this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
@VisibleForTesting
Map<N, Map<K, S>> getMapForKeyGroup(int keyGroupIndex) {
final int pos = indexToOffset(keyGroupIndex);
if (pos >= 0 && pos < state.length) {
return state[pos];
} else {
return null;
}
}
private int indexToOffset(int index) {
return index - keyGroupOffset;
}
public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
super(keyContext, metaInfo);
this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
@SuppressWarnings("unchecked")
Map<N, Map<K, S>>[] state = (Map<N, Map<K, S>>[]) new Map[keyContext.getNumberOfKeyGroups()];
this.state = state;
}