0
votes

I'm using Flink 1.10.1 with FsStateBackend as state backend for checkpoints. I have some stateful operations that during the application is running (running as .jar application not as a cluster) they work as expected, but if the application stop(or crashed) for some reason, the states that should be stored in filesystem with the checkpoints are not loaded and the functions don't any previous reference, then I need to load the information from the database and save it as state to work with these previous states again. There must be a way to do this using the checkpoints and FsStateBackend without having to read all the information from the database, just reload this states from the checkpoints already stored. Is this possible?

Here is some code: My checkpoint configuration

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, GetConfiguration.getConfig());
final StateBackend stateBackend = new FsStateBackend(new Path("/some/path/checkpoints").toUri(), true);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setCheckpointTimeout(60000);
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
            env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
            env.setRestartStrategy(RestartStrategies.noRestart());
            env.setStateBackend(stateBackend);

and this is the example that I want to avoid:

public class EventCountMap extends RichMapFunction<Event, EventCounter> {
    private static final MapStateDescriptor<String, Timestamp> descriptor = new MapStateDescriptor<>("previous_counter", String.class, Timestamp.class);
    private static final EventCounter eventCounter = new EventCounter();
    private MapState<String, Timestamp> previous_state;
    private static final StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(org.apache.flink.api.common.time.Time.days(1))
            .cleanupFullSnapshot()
            .build();

    @Override
    public void open(Configuration parameters) {
        descriptor.enableTimeToLive(ttlConfig);
        previous_state = getRuntimeContext().getMapState(descriptor);
    }

/*I want to avoid to call this function that load all events from db and pass them to the state to be used. This happens only once but there must be a efficient way to do this in flink.*/
    private void mapRefueled() throws Exception {
        Preconditions.checkNotNull(previous_state);
        for (Map.Entry<String, Timestamp> map : StreamingJob.update_beh_count_ts.entrySet())
            previous_state.put(map.getKey(), map.getValue());
        StreamingJob.update_beh_count_ts.clear();
    }

    @Override
    public EventCounter map(Event event) throws Exception {
        /*Refuel map state in case of failures*/
        if (!StreamingJob.update_beh_count_ts.isEmpty()) mapRefueled();
        eventCounter.date = new Date(event.timestamp.getTime());
        final String key_first = eventCounter.date.toString().concat("_ts_first");
        final String key_last = eventCounter.date.toString().concat("_ts_last");
        if (previous_state.contains(key_first) && previous_state.contains(key_last)) {
            final Timestamp first = (previous_state.get(key_first).after(event.timestamp)) ? event.timestamp : previous_state.get(key_first);
            final Timestamp last = (previous_state.get(key_last).before(event.timestamp)) ? event.timestamp : previous_state.get(key_last);
            previous_state.put(key_first, first);
            previous_state.put(key_last, last);
        } else {
            previous_state.put(key_first, event.timestamp);
            previous_state.put(key_last, event.timestamp);
        }
        eventCounter.first_event = previous_state.get(key_first);
        eventCounter.last_event = previous_state.get(key_last);
        return eventCounter;
    }
}

Hoping I could explain myself for you to understand what I need to do. Kind regards! Thanks in advance.

1
Are you saying that the state In the checkpoint is not being restored? You have disabled the automatic restart, so how are you doing the restart, and how do you know the state isn’t being restored?David Anderson
Maybe I explain myself wrong. I'd disable restart strategy because my Service Manager will take care of restart flink if it stops, in support of this my checkpoints will not been erased in case of failures, so when flink start again these states must be loaded again, but they don't, or this must be required for restart strategy only? I have a RichMap for example, the when I call previous_state = getRuntimeContext().getMapState(descriptor); that previous state is null for all previous ids. There is a way to read those checkpoints and load the states from Java? Thanks again Kind regardsAlter

1 Answers

0
votes

To have the state from a checkpoint loaded while restarting a job, you have to explicitly arrange for that to happen, otherwise the job will be re-run without loading the checkpoint.

See Restore from a savepoint and Resuming from a retained checkpoint for details, but the gist of it is this:

bin/flink run -s :checkpointPath [:runArgs]

My guess is this isn't being done.

In terms of best practices for how to configure your Flink cluster for automatic recovery, that depends on what you're using (Yarn, Mesos, Kubernetes, ...).