0
votes

When thinking about the act of keying by something I traditionally think of the analogy of throwing all the events that match the key into the same bucket. As you can imagine, when the Flink application starts handling lots of data what you opt to key by starts to become important because you want to make sure you clean up state well. This leads me to my question, how exactly does Flink clean up these "buckets"? If the bucket is empty (all the MapStates and ValueStates are empty) does Flink close that area of the key space and delete the bucket?

Example:

Incoming Data Format: {userId, computerId, amountOfTimeLoggedOn}

Key: UserId/ComputerId

Current Key Space:

  • Alice, Computer 10: Has 2 events in it. Both events are stored in state.
  • Bob, Computer 11: Has no events in it. Nothing is stored in state.

Will Flink come and remove Bob, Computer 11 from the Key Space eventually or does it just live on forever because at one point it had an event in it?

2

2 Answers

2
votes

Flink does not store any data for state keys which do not have any user value associated with them, at least in the existing state backends: Heap (in memory) or RocksDB.

The Key Space is virtual in Flink, Flink does not make any assumptions about which concrete keys can potentially exist. There are no any pre-allocated buckets per key or subset of keys. Only once user application writes some value for some key, it occupies storage.

The general idea is that all records with the same key are processed on the same machine (somewhat like being in the same bucket as you say). The local state for a certain key is also always kept on the same machine (if stored at all). This is not related to checkpoints though.

For your example, if some value was written for [Bob, Computer 11] at some point of time and then subsequently removed, Flink will remove it completely with the key.

0
votes

Short Answer

It cleans up with the help of Time To Live (TTL) feature of Flink State and Java Garbage Collector (GC). TTL feature will remove any reference to the state entry and GC will take back the allocated memory.

Long Answer

Your question can be divided into 3 sub-questions:

I will try to be as brief as possible.

How does Flink partition the data based on Key?

For an operator over a keyed stream, Flink partitions the data on a key with the help of Consistent Hashing Algorithm. It creates max_parallelism number of buckets. Each operator instance is assigned one or more of these buckets. Whenever a datum is to be sent downstream, the key is assigned to one of those buckets and consequently sent to the concerned operator instance. No key is stored here because ranges are calculated mathematically. Hence no area is cleared or bucket is deleted anytime. You can create any type of key you want. It won't affect the memory in terms of keyspace or ranges.

How does Flink store state with a Key?

All operator instances have an instance-level state store. This store defines the state context of that operator instance and it can store multiple named-state-storages e.g. "count", "sum", "some-name" etc. These named-state-storages are Key-Value stores that can store values based on the key of the data.

These KV stores are created when we initialize the state with a state descriptor in open() function of an operator. i.e. getRuntimeContext().getValueState().

These KV stores will store data only when something is needed to be stored in the state. (like HashMap.put(k,v)). Thus no key or value is stored unless state update methods (like update, add, put) are called.

So,

  • If Flink hasn't seen a key, nothing is stored for that key.
  • If Flink has seen the key but didn't call the state update methods, nothing is stored for that key.
  • If a state update method is called for a key, the key-value pair will be stored in the KV store.

How does Flink clean up the state for a Key?

Flink does not delete the state unless it is required by the user or done by the user manually. As mentioned earlier, Flink has the TTL feature for the state. This TTL will mark the state expiry and remove it when a cleanup strategy is invoked. These cleanup strategies vary wrt backend type and the time of cleanup. For Heap State Backend, It will remove the entry from a state table i.e. removing any reference to the entry. The memory occupied by this non-referenced entry will be cleaned up by Java GC. For RocksDB State Backend, it simply calls the native delete method of RocksDB.