0
votes
    MapState<String, Integer> someMapState;
    ...

    public void processElement(String s, Context context, Collector<Integer> collector) throws Exception {
        // Store stuff into map
        Streams.stream(someMapState.values()).forEach(collector::collect);

    }

Is State access like MapState.values() lazy(values are not loaded to memory all at once) for RocksDB backend?

Does this behavior vary depending on State type(ValueState, ListState, and so on.)?

Note: I tried digging through docs and source code but could not find any details regarding this, any pointers would be appreciated, Thank you

1

1 Answers

1
votes

The RocksDB state backend keeps its working state on disk, as serialized bytes, with an off-heap (in memory) block cache. Ser/de is required on every state access/update.

With MapState, each entry in the Map is a separate RocksDB object, allowing for efficient reads and writes of map entries. ListState is a single object, but the RocksDB state backend can append to ListState simply by appending the serialized bytes of the new entry.

Internally, org.apache.flink.contrib.streaming.state.RocksDBMapState#values uses a RocksDBMapIterator, which does not load all of the values into memory at once. It works through the keyspace in chunks, which it caches. (And for what it's worth, it iterates over the keys in sorted, binary order.)

With ListState, the entire list does need to fit onto the heap, and its serialized value can't exceed 2^31 bytes.

If you are using RocksDB with incremental checkpoints, then checkpoints are highly optimized, as all that is required is to copy any new (immutable) SST files to the durable file storage (and to delete any SST files that are no longer relevant due to compaction).

When restoring from incremental checkpoints, state is only lazily loaded. If local recovery is enabled, then all that is needed is to copy any missing SST files from the remote, durable file store, so in many cases, recovery can be almost immediate.