0
votes

The problem: Flink application is not receiving and processing the events from Kinesis connector generated when it was down ( due to restart)

We have the following Flink env setting

env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause); 
env.getCheckpointConfig().setCheckpointTimeout(timeOut); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and Kinesis has following initial configuration

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "LATEST");

Intrestingly when I change the Kinesis configuration to reply the event i.e.

 kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "TRIM_HORIZON");

Flink is receiving all the buffered records (this includes those events generated before, during and after event Flink application was down) from Kinesis and processing it. Thus this behavior violates "Exactly once" property of the Flink application.

Can someone point out some obvious things I am missing?

1

1 Answers

1
votes

The Flink Kinesis connector does store the shard sequence numbers in the state for exactly-once processing.

From your description, it seems like on your job "restart", the checkpointed state is not respected.

Just to first eliminate the obvious: How is your job resuming from the restart? Are you resuming from a savepoint, or is this restart automatically done from a previous checkpoint?