2
votes

I have a use case, where, I am consuming events from a kinesis stream via flink job running on EMR (using flink-kinesis connector). The job receives the event, process it and sink it to some datastore. By processing, here I mean to apply some transformations (data enrichment) and do aggregation. I have few questions here:

  1. How can I maintain the idempotency while consuming the events from kinesis? I want to do exactly once processing since duplicates may give wrong results while aggregating the values. One way which I can think of is to maintain a primary key in the event. But for that, I need to store the values processed and perform a lookup every time, which may degrade the latency. How can I handle this? Is there any other way to solve this? (scale estimate: I am going to have 500k-600k events per day coming in the stream)

  2. For the data enrichment part, I need to consume data from external system. What can be the best candidates for the external store and the way to consume it? I want to avoid lookup for every event I am processing.

  3. There can be the scenarios where I want to reprocess certain events. If I maintain the idempotency via primary key, how can I handle the reprocess case?

1
What kind of duplicates are we talking about ?? You have duplicated events in Your data and want to be sure that You are not processing them twice ?? Because flink-kinesis connector provides exactly-once processing of events. - Dominik Wosiński
yes, in case if producer puts duplicate events in the stream - Pooja Agrawal
Do you have any possible limitations on possible duplicates? I.e. duplicated message may occur in 2 sec after the original one, or can it be totally random ? - Dominik Wosiński
We are trying to put the limitation on time. As of now, it's random - Pooja Agrawal

1 Answers

0
votes

I hope this can help you to set up checkpoint-related configuration.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)