The problem: Flink application is not receiving and processing the events from Kinesis connector generated when it was down ( due to restart)
We have the following Flink env setting
env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause);
env.getCheckpointConfig().setCheckpointTimeout(timeOut);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
and Kinesis has following initial configuration
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
Intrestingly when I change the Kinesis configuration to reply the event i.e.
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
Flink is receiving all the buffered records (this includes those events generated before, during and after event Flink application was down) from Kinesis and processing it. Thus this behavior violates "Exactly once" property of the Flink application.
Can someone point out some obvious things I am missing?