1
votes

Before defining and executing the job graph of my stream processor in Apache Flink, I want to run some initialization code, e.g., for creating the Kafka topics which I use as a sink in the job graph. However, this initialization code should not run on when the stream processor is restored from a savepoint, e.g., during updates to the stream processor. Is there a way to programmatically check if the job is started from a savepoint?

2

2 Answers

1
votes

When restarting a job from savepoint you have to specify the path to the savepoint's directory.

We do this the following way:

$ bin/flink run -s :savepointPath [:runArgs]

If I understand correctly your question, all you have to do is verify that --fromSavepoint or -s(alias) was specified.

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#resuming-from-savepoints

1
votes

It is possible to implement Flink functions that are aware of snapshots and restores. You do this by implementing the CheckpointedFunction interface. Then when initializeState(FunctionInitializationContext context) is called, you can examine context.isRestored() to determine if the job is being restarted from a snapshot (i.e., from a checkpoint or savepoint), or not.

Another approach you might take would be to check if the topics already exist, and if not, go ahead and create them without considering how the job was started.