Short Answer
It cleans up with the help of Time To Live (TTL) feature of Flink State and Java Garbage Collector (GC). TTL feature will remove any reference to the state entry and GC will take back the allocated memory.
Long Answer
Your question can be divided into 3 sub-questions:
I will try to be as brief as possible.
How does Flink partition the data based on Key?
For an operator over a keyed stream, Flink partitions the data on a key with the help of Consistent Hashing Algorithm. It creates max_parallelism
number of buckets. Each operator instance is assigned one or more of these buckets. Whenever a datum is to be sent downstream, the key is assigned to one of those buckets and consequently sent to the concerned operator instance. No key is stored here because ranges are calculated mathematically. Hence no area is cleared or bucket is deleted anytime. You can create any type of key you want. It won't affect the memory in terms of keyspace or ranges.
How does Flink store state with a Key?
All operator instances have an instance-level state store. This store defines the state context of that operator instance and it can store multiple named-state-storages e.g. "count", "sum", "some-name" etc. These named-state-storages are Key-Value stores that can store values based on the key of the data.
These KV stores are created when we initialize the state with a state descriptor in open()
function of an operator. i.e. getRuntimeContext().getValueState()
.
These KV stores will store data only when something is needed to be stored in the state. (like HashMap.put(k,v)
). Thus no key or value is stored unless state update methods (like update
, add
, put
) are called.
So,
- If Flink hasn't seen a key, nothing is stored for that key.
- If Flink has seen the key but didn't call the state update methods, nothing is stored for that key.
- If a state update method is called for a key, the key-value pair will be stored in the KV store.
How does Flink clean up the state for a Key?
Flink does not delete the state unless it is required by the user or done by the user manually. As mentioned earlier, Flink has the TTL feature for the state. This TTL will mark the state expiry and remove it when a cleanup strategy is invoked. These cleanup strategies vary wrt backend type and the time of cleanup. For Heap State Backend, It will remove the entry from a state table i.e. removing any reference to the entry. The memory occupied by this non-referenced entry will be cleaned up by Java GC. For RocksDB State Backend, it simply calls the native delete
method of RocksDB.