I am implementing incremental checkpoints using RocksDB as statebackend in my flink code, but i want to know is incremental checkpoints are happening what i meant is there way to check logs or flink dashboard whether it is performing incremental checkpoints or full checkpoints
- I am using flink version 1.10.0 as per flink documentation i saw logging mechanism is disabled in Flink version 1.10.0 i followed this Ververica link to enable RocksDB logging below is code for enabling logging i used
import static org.apache.flink.configuration.ConfigOptions.key;
import java.util.Collection;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
private static final long serialVersionUID = 1L;
private String dbLogDir = "";
@Override
public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
currentOptions = super.createDBOptions(currentOptions, handlesToClose);
currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
currentOptions.setStatsDumpPeriodSec(60);
currentOptions.setDbLogDir(dbLogDir);
return currentOptions;
}
@Override
public String toString() {
return this.getClass().toString() + "{" + super.toString() + '}';
}
/**
* Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
* data directory's absolute path will be used as the log file prefix).
*/
public void setDbLogDir(String dbLogDir) {
this.dbLogDir = dbLogDir;
}
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
"data directory's absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
I did below settings in my code to enable logging
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(interval);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
env.setStateBackend(stateBackend);
stateBackend.setRocksDBOptions(options);
I added below 2 setting in my flink configuration file to enable RocksDB logging
state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog
I went through complete flink dashboard but i didn't get any clue how to check is incremental checkpoint is happening or full checkpointing is happening. Please help me how can i set logging for RocksDB to know that incremental checkpoints is happening or not. I saw in the documentation RocksDB logging will cause huge cost in performance as well as the storage this is for testing purpose after that i will disable that