I am working on a real-time data pipeline with AWS kinesis and lambda and I am trying to figure out how I can guarantee that records from the same data producers are processed by the same shard and ultimately by the same lambda function instance.
My approach is to use partition keys to make sure records from the same producers are processed by the same shard. However, I cannot get it working that records from the same shard are processed by the same lambda function instance.
The basic setup is the following:
- There are multiple data sources that send data to a kinesis stream.
- The stream has more than one shard to handle the load.
- There is a lambda function connected to the scream with an event source mapping (batch size is 500).
- The lambda function is processing the records, does some data transformations and a few other things and then puts everything into firehose.
- There is more happening later but that is not relevant for the question.
It looks like that:
As you can see in the figure, there are three lambda function instances invoked for processing; one for each shard. In this pipeline, it is important that records from the same data source are processed by the same lambda function instance. According to what I read, this can be guaranteed by making sure all records from the same source use the same partition key so they are processed by the same shard.
Partition Keys
A partition key is used to group data by shard within a stream. The Kinesis Data Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to. Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.
Source: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key
This works. So records with the same partition key are processed by the same shard. However, they are processed by different lambda function instances. So there is one lambda function instance invoked per shard but it doesn't only process records from one shard but from multiple shards. There seems to be no pattern here how records are handed over to lambda.
Here is my test setup: I sent a bunch of test data into the stream and printed the records in the lambda function. This is the output of the three function instances (check the partition keys at the end of each row. Each key should only appear in one of the three logs and not in multiple ones):
Lambda instance 1:
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
Lambda instance 2:
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
Lambda instance 3:
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
This is how I insert the data into the stream (as you can see, the partition key is set to the source id):
processed_records = []
for r in records:
processed_records.append({
'PartitionKey': str(r['source']),
'Data': json.dumps(r),
})
kinesis.put_records(
StreamName=stream,
Records=processed_records,
)
So my questions would be:
- Why doesn't each lambda function only processes records of exactly one shard?
- How can this be done?
Thanks!