6
votes

I read the document of official AWS Kinesis Firehose but it doesn't mention how to handle duplicated events. Does anyone have experience on it? I googled someone use ElasticCache to do filtering, does it mean I need to use AWS Lambda to encapsulate such filtering logic? Is there any simple way like firehose to ingest data into Redshift and at the same time has "exactly once" semantics? Thanks a lot!

2
Hi! can you link or reference such document? Thank you.MattAllegro
It's expected as Firehose guarantees at-least-once semantics. Here's the official documentation related to this behaviour.jweyrich

2 Answers

12
votes

You can have duplication on both sides of the Kinesis Stream. You might put the same events twice into the Stream, and you might read the event twice by the consumers.

The producers side can happen if you try to put an event to the Kinesis stream, but for some reason you are not sure if it was written successfully or not, and you decide to put it again. The consumer side can happen if you are getting a batch of events and start processing them, and you crash before you managed to checkpoint your location, and the next worker is picking the same batch of events from the Kinesis stream, based on the last checkpoint sequence-ID.

Before you start solving this problem, you should evaluate how often do you have such duplication and what is the business impact of such duplications. Not every system is handling financial transactions that can't tolerate duplication. Nevertheless, if you decide that you need to have such de-duplication, a common way to solve it is to use some event-ID and track if you processed that event-ID already.

ElasticCache with Redis is a good place to track your event-ID. Every time you pick up an event for processing, you check if you already have it in the hash table in Redis, if you find it, you skip it, and if you don't find it, you add it to the table (with some TTL based on the possible time window for such duplication).

If you choose to use Kinesis Firehose (instead of Kinesis Streams), you no longer have control on the consumer application and you can't implement this process. Therefore, you either want to run such de-duplication logic on the producer side, switch to use Kinesis Streams and run your own code in Lambda or KCL, or settle for the de-duplication functions in Redshift (see below).

If you are not too sensitive to duplication, you can use some functions in Redshift, such as COUNT DISTINCT or LAST_VALUE in a WINDOW function.

0
votes

Not sure if this could be a solution. But to handle duplicates, you need to write your own KCL. Firehose cannot gurantee no duplication. You can get rid of Firehose once you have your own KCL consumers that processes your data from the Kinesis Date Stream. If you do so you can follow the linked article, (full disclosure, auther here), which stores events into S3 after deduplicating and processing it through a KCL consumer.

Store events by grouping them based on the minute they were received by the Kinesis data stream by looking at their ApproximateArrivalTimestamp. This allows us to always save our events on the same key prefix, given a batch of records no matter when they are processed. For e.g. all events received by Kinesis at 2020/02/02/ 15:55 Hrs will be stored at /2020/02/02/15/55/*. Therefore, if the key is already present in the given minute, it means that the batch has already been processed and stored to S3.

You can implement your own ISequenceStore which will be implemented against Redshift in your case (In the article, it is done against S3). Read the full article below.

https://www.nabin.dev/avoiding-duplicate-records-with-aws-kcl-and-s3