I'm using Flink 1.10.1 with FsStateBackend as state backend for checkpoints. I have some stateful operations that during the application is running (running as .jar application not as a cluster) they work as expected, but if the application stop(or crashed) for some reason, the states that should be stored in filesystem with the checkpoints are not loaded and the functions don't any previous reference, then I need to load the information from the database and save it as state to work with these previous states again. There must be a way to do this using the checkpoints and FsStateBackend without having to read all the information from the database, just reload this states from the checkpoints already stored. Is this possible?
Here is some code: My checkpoint configuration
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, GetConfiguration.getConfig());
final StateBackend stateBackend = new FsStateBackend(new Path("/some/path/checkpoints").toUri(), true);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(stateBackend);
and this is the example that I want to avoid:
public class EventCountMap extends RichMapFunction<Event, EventCounter> {
private static final MapStateDescriptor<String, Timestamp> descriptor = new MapStateDescriptor<>("previous_counter", String.class, Timestamp.class);
private static final EventCounter eventCounter = new EventCounter();
private MapState<String, Timestamp> previous_state;
private static final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.cleanupFullSnapshot()
.build();
@Override
public void open(Configuration parameters) {
descriptor.enableTimeToLive(ttlConfig);
previous_state = getRuntimeContext().getMapState(descriptor);
}
/*I want to avoid to call this function that load all events from db and pass them to the state to be used. This happens only once but there must be a efficient way to do this in flink.*/
private void mapRefueled() throws Exception {
Preconditions.checkNotNull(previous_state);
for (Map.Entry<String, Timestamp> map : StreamingJob.update_beh_count_ts.entrySet())
previous_state.put(map.getKey(), map.getValue());
StreamingJob.update_beh_count_ts.clear();
}
@Override
public EventCounter map(Event event) throws Exception {
/*Refuel map state in case of failures*/
if (!StreamingJob.update_beh_count_ts.isEmpty()) mapRefueled();
eventCounter.date = new Date(event.timestamp.getTime());
final String key_first = eventCounter.date.toString().concat("_ts_first");
final String key_last = eventCounter.date.toString().concat("_ts_last");
if (previous_state.contains(key_first) && previous_state.contains(key_last)) {
final Timestamp first = (previous_state.get(key_first).after(event.timestamp)) ? event.timestamp : previous_state.get(key_first);
final Timestamp last = (previous_state.get(key_last).before(event.timestamp)) ? event.timestamp : previous_state.get(key_last);
previous_state.put(key_first, first);
previous_state.put(key_last, last);
} else {
previous_state.put(key_first, event.timestamp);
previous_state.put(key_last, event.timestamp);
}
eventCounter.first_event = previous_state.get(key_first);
eventCounter.last_event = previous_state.get(key_last);
return eventCounter;
}
}
Hoping I could explain myself for you to understand what I need to do. Kind regards! Thanks in advance.
previous_state = getRuntimeContext().getMapState(descriptor);
that previous state is null for all previous ids. There is a way to read those checkpoints and load the states from Java? Thanks again Kind regards – Alter