3
votes

I am reading from an unbound source (Kafka) and writing its wordcount to other Kafka topic. Now I want to perform checkpoint in beam Pipeline. I have followed all the instructions in the apache beam documentation but checkpoint directory is not created even after that.

below are the parameters for I used for the pipeline:-

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

can anyone help me out with checkpointing?

2

2 Answers

1
votes

I have worked on the solution, so one is you can change the checkpoint.state.dir path in flink-conf.yaml of link cluster and other is by using flinkPipelineOptions-

        @Description(
                "Sets the state backend factory to use in streaming mode. "
                        + "Defaults to the flink cluster's state.backend configuration.")
        Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
        void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);

and by setting setStateBackendFactory (i have done using custom class)

  static class  bakend implements FlinkStateBackendFactory{

        @Override
        public StateBackend createStateBackend(FlinkPipelineOptions options) {
            return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");

        }
    }

this will create a checkpointDir also you also need to set a value of checkpointinginterval for checkpointing to be enabled.

0
votes

I know it is old, but want to agree with your answer. we built a dockerized flink in 2019 and beam and running with these options

--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev

and we have configured in conf.yml with rocksdb as backend.