0
votes

I have a very simple setup of 4-node Flink cluster where one of nodes is Jobmanager, others are Taskmanagers and started by start-cluster script. All task managers have the same configuration, regarding state and checkpointing it's as follows:

state.backend: rocksdb
state.backend.fs.checkpointdir: file:///root/flink-1.3.1/checkpoints/fs
state.backend.rocksdb.checkpointdir: file:///root/flink-1.3.1/checkpoints/rocksdb
# state.checkpoints.dir: file:///root/flink-1.3.1/checkpoints/metadata
# state.checkpoints.num-retained: 2

(The latter 2 options are commented intentionally as I tried uncommenting them and it didn't change a thing.)

And in code I have:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.enableCheckpointing(10.minutes.toMillis)
streamEnv.getCheckpointConfig.setCheckpointTimeout(1.minute.toMillis)
streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

After job is working for 40 minutes, in directory

/root/flink-1.3.1/checkpoints/fs/.../

I see 4 checkpoint directories with name pattern "chk-" + index, whereas I expected that old checkpoints would be deleted and there would be only one checkpoint left.(from the docs, only one checkpoint by default should be retained) Meanwhile, in web UI Flink marks first three checkpoints as "discarded".

Did I configure anything wrong or it's an expected behaviour?

1

1 Answers

4
votes

The deletion is done by the job manager, which probably has no way of accessing your files (in /root)