0
votes

When I use flink sql api process data.

Restart app, sum result not save in checkpoint.It's still start with 1.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new FsStateBackend("file:///D:/d_backup/github/flink-best-practice/checkpoint");
env.enableCheckpointing(1000 * 60);
env.setStateBackend(stateBackend);

Table table = tableEnv.sqlQuery(
        "select sum(area_id) " +
        "from rtc_warning_gmys " +
        "where area_id = 1 " +
        "group by character_id,area_id,group_id,platform");

//   convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.map(new MapFunction<Tuple2<Boolean,Row>, Object>() {
    @Override
    public Object map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
        if(booleanRowTuple2.f0) {
            System.out.println(booleanRowTuple2.f1.toString());
            return booleanRowTuple2.f1;
        }
        return null;
    }
});

env.execute("Kafka table select");

Log as:

1 2 3 ... ... 100

Restart app it still start: 1 2 3 ...

I think sum value will be stored in checkpint file and restart app can read last result from checkpint like:

101 102 103 ... 120

1

1 Answers

3
votes

Some possibilities:

  • Did the job run long enough to complete a checkpoint? Just because the job produced output doesn't mean that a checkpoint was completed. I see you have checkpointing configured to occur once a minute, and the checkpoints could take some time to complete.

  • How was the job stopped? Unless they have been externalized, checkpoints are deleted when a job is cancelled.

  • How was the job restarted? Did it recover (automatically) from a checkpoint, or was it resumed from an externalized checkpoint or savepoint, or was it restarted from scratch?

This sort of experiment is easiest to do via the command line. You might, for example,

  1. write an app that uses checkpoints, and has a restart strategy (e.g., env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)))
  2. start a local cluster
  3. "flink run -d app.jar" to start the job
  4. wait until at least one checkpoint has completed
  5. "kill -9 task-manager-PID" to cause a failure
  6. "taskmanager.sh start" to allow the job to resume from the checkpoint