0
votes

I'm using flink 1.9 and the REST API /jobs/:jobid/savepoints to trigger the savepoint and cancel job (stop the job gracefully to run later on from savepoint).

I use a two-phase commit in source function so my source implements both CheckpointedFunction and CheckpointListener interfaces. On snapshotState() method call I snapshot the internal state and on notifyCheckpointComplete() I checkpoint state to 3rd party system.

From what I can see from source code, only the snapshotState() part is synchronous in CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

The checkpoint acknowledge and completion notification is asynchronous in AsyncCheckpointRunnable.

That being said, when the savepoint with cancel-job set to true is triggered, after the snapshot is taken, some of the Task Managers keep up to receive completion notification before the job cancelling and execute notifyCheckpointComplete(), and some not.

The question is whether there is a way to cancel job with savepoint so that the notifyCheckpointComplete() is guaranteed to be invoked by all Task Managers before job cancelled or there is no way to achieve this at the moment ?

2

2 Answers

2
votes

It's been a while since I looked at Flink 1.9 so please take my answer with some caution.

My guess is that your sources cancel too early. So notifyCheckpointComplete is actually sent to all tasks, but some SourceFunctions already quit the run and the respective task is cleaned up.

Afaik, what you described should be possible if you ignore cancellation and interruptions until you have received the last notifyCheckpointComplete.

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // start two-phase commit
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // finish two-phase commit
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // do normal source stuff
        }
        // keep the task running after cancellation
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore interruptions until two-phase commit is done
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}