0
votes
  • I am using Flink version 1.10.1 with rocksdb backend.
  • I know that rocksdb using memory from "managed memory" and I did not setup any specific value for managed memory. It is done by Flink.
  • When I monitor my application, free memory of taskmanagers always decreasing (I mean free memory of operating system measured via free -h). I suspect that the reason could be Rocksdb.
  • Question_1 => if ValueState's value expired, then rocksdb will remove from its memory and will delete from localstorage directory? (I have also limited storage capacity)
  • Question_2 => stream.keyBy(ipAddress), if this ipAddress will be hold by rocksdb (i am talking about keyBy itself not the state), does it always place in managed memory? If not, then flink heap memory will be increased?

Here is the general structure of my application:

streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....

Here is the state example from my application:

 private transient ValueState<SampleObject> sampleState;
 StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
                "sampleState",
                TypeInformation.of(SampleObject.class)
        );
        sampleValueStateDescriptor.enableTimeToLive(ttlConfig);

Rocksdb configuration:

state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local

Why I am using Rocksdb

  • I am using rocksdb because I have a huge key size(think of it ip address) that would not be handled by HeapState backend or other.
  • My application using rocksdb because I have a bunch of state in the user defined keyedprocessfunction for future decision. (each of state has `StateTtlConfig)

Note

  • My application does not need incremental checkpointing or anything about savepoint. I don't care about the saving all snapshot of my application.
1

1 Answers

1
votes

Flink ValueState will be removed from storage after expired when using Rocksdb?

Yes, but not immediately. (And in some earlier versions of the Flink, the answer was "it depends".)

In your state ttl config you haven't specified how you want state cleanup to be done. In this case, expired values are explicitly removed on read (such as ValueState#value) and are otherwise periodically garbage collected in the background. In the case of RocksDB, this background cleanup is done during compaction. In other words, the cleanup isn't immediate. The docs provide more details on how you can tune this -- you could configure the cleanup to be done more quickly, at the expense of some performance degradation.

A keyBy itself does not use any state. The key selector function is used to partition the stream, but the keys are not stored in connection with the keyBy. Only the windows and flatmap operations are keeping state, which is per-key state, and all of this keyed state will be in RocksDB (unless you have configured your timers to be on the heap, which is an option, in but Flink 1.10 timers are stored off-heap, in rocksdb, by default).

You could change the flatmap to a KeyedProcessFunction and use timers to explicitly clear state for state keys -- which would give you direct control over exactly when the state is cleared, rather than relying on the state TTL mechanism to eventually clear the state.

But it's more likely that the windows are building up considerable state. If you can switch to doing pre-aggregation (via reduce or aggregate) that may help a lot.