- I am using Flink version 1.10.1 with rocksdb backend.
- I know that rocksdb using memory from "managed memory" and I did not setup any specific value for managed memory. It is done by Flink.
- When I monitor my application, free memory of taskmanagers always decreasing (I mean free memory of operating system measured via
free -h). I suspect that the reason could be Rocksdb. - Question_1 => if
ValueState's value expired, then rocksdb will remove from its memory and will delete from localstorage directory? (I have also limited storage capacity) - Question_2 =>
stream.keyBy(ipAddress), if thisipAddresswill be hold by rocksdb (i am talking about keyBy itself not the state), does it always place in managed memory? If not, then flink heap memory will be increased?
Here is the general structure of my application:
streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....
Here is the state example from my application:
private transient ValueState<SampleObject> sampleState;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
"sampleState",
TypeInformation.of(SampleObject.class)
);
sampleValueStateDescriptor.enableTimeToLive(ttlConfig);
Rocksdb configuration:
state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local
Why I am using Rocksdb
- I am using rocksdb because I have a huge key size(think of it ip address) that would not be handled by HeapState backend or other.
- My application using rocksdb because I have a bunch of state in the user defined keyedprocessfunction for future decision. (each of state has `StateTtlConfig)
Note
- My application does not need incremental checkpointing or anything about savepoint. I don't care about the saving all snapshot of my application.