0
votes

I am implementing incremental checkpoints using RocksDB as statebackend in my flink code, but i want to know is incremental checkpoints are happening what i meant is there way to check logs or flink dashboard whether it is performing incremental checkpoints or full checkpoints

  1. I am using flink version 1.10.0 as per flink documentation i saw logging mechanism is disabled in Flink version 1.10.0 i followed this Ververica link to enable RocksDB logging below is code for enabling logging i used
    import static org.apache.flink.configuration.ConfigOptions.key;
    
    import java.util.Collection;
    import org.apache.flink.configuration.ConfigOption;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
    import org.rocksdb.DBOptions;
    import org.rocksdb.InfoLogLevel;
    
    public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
        private static final long serialVersionUID = 1L;
    
        private String dbLogDir = "";
    
        @Override
        public DBOptions createDBOptions(DBOptions currentOptions,
                                         Collection<AutoCloseable> handlesToClose) {
            currentOptions = super.createDBOptions(currentOptions, handlesToClose);
    
            currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
            currentOptions.setStatsDumpPeriodSec(60);
            currentOptions.setDbLogDir(dbLogDir);
    
            return currentOptions;
        }
    
        @Override
        public String toString() {
            return this.getClass().toString() + "{" + super.toString() + '}';
        }
    
        /**
         * Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
         * data directory's absolute path will be used as the log file prefix).
         */
        public void setDbLogDir(String dbLogDir) {
            this.dbLogDir = dbLogDir;
        }
    
        public static final ConfigOption<String> LOG_DIR =
                key("state.backend.rocksdb.log.dir")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
                                "data directory's absolute path will be used as the log file prefix)");
    
        @Override
        public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
            DefaultConfigurableOptionsFactory optionsFactory =
                    super.configure(configuration);
    
            this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
    
            return optionsFactory;
        }

I did below settings in my code to enable logging

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.enableCheckpointing(interval);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
    
    DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
    options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
    
    env.setStateBackend(stateBackend);
    
    stateBackend.setRocksDBOptions(options);

I added below 2 setting in my flink configuration file to enable RocksDB logging

    state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
    state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog

I went through complete flink dashboard but i didn't get any clue how to check is incremental checkpoint is happening or full checkpointing is happening. Please help me how can i set logging for RocksDB to know that incremental checkpoints is happening or not. I saw in the documentation RocksDB logging will cause huge cost in performance as well as the storage this is for testing purpose after that i will disable that

1

1 Answers

2
votes

I'm not sure if this information is logged or displayed anywhere, but in your code you could use

stateBackend.isIncrementalCheckpointsEnabled()

to determine if your RocksDB state backend has checkpoints enabled, and then log this information yourself.

Note that to enable incremental checkpoints (which are off by default), you'll need to configure

state.backend.incremental: true