10
votes

I am working on a real-time data pipeline with AWS kinesis and lambda and I am trying to figure out how I can guarantee that records from the same data producers are processed by the same shard and ultimately by the same lambda function instance.

My approach is to use partition keys to make sure records from the same producers are processed by the same shard. However, I cannot get it working that records from the same shard are processed by the same lambda function instance.

The basic setup is the following:

  • There are multiple data sources that send data to a kinesis stream.
  • The stream has more than one shard to handle the load.
  • There is a lambda function connected to the scream with an event source mapping (batch size is 500).
  • The lambda function is processing the records, does some data transformations and a few other things and then puts everything into firehose.
  • There is more happening later but that is not relevant for the question.

It looks like that:

enter image description here

As you can see in the figure, there are three lambda function instances invoked for processing; one for each shard. In this pipeline, it is important that records from the same data source are processed by the same lambda function instance. According to what I read, this can be guaranteed by making sure all records from the same source use the same partition key so they are processed by the same shard.

Partition Keys

A partition key is used to group data by shard within a stream. The Kinesis Data Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to. Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.

Source: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key

This works. So records with the same partition key are processed by the same shard. However, they are processed by different lambda function instances. So there is one lambda function instance invoked per shard but it doesn't only process records from one shard but from multiple shards. There seems to be no pattern here how records are handed over to lambda.

Here is my test setup: I sent a bunch of test data into the stream and printed the records in the lambda function. This is the output of the three function instances (check the partition keys at the end of each row. Each key should only appear in one of the three logs and not in multiple ones):

Lambda instance 1:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}

Lambda instance 2:

{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}

Lambda instance 3:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}

This is how I insert the data into the stream (as you can see, the partition key is set to the source id):

processed_records = []
for r in records:
    processed_records.append({
        'PartitionKey': str(r['source']),
        'Data': json.dumps(r),
    })

kinesis.put_records(
    StreamName=stream,
    Records=processed_records,
)

So my questions would be:

  • Why doesn't each lambda function only processes records of exactly one shard?
  • How can this be done?

Thanks!

1
Are you using KCL to consume the records from the stream?Rajan Panneer Selvam
I am consuming the stream directly through an event source mapping (docs.aws.amazon.com/de_de/lambda/latest/dg/…). So the records are handed over to the lambda function through the event passed to the function.oneschilling
@oneschilling did you ever resolve this? I am having the same issuegfree
@gfree apparently it is not possible. I also spoke with the AWS support and they confirmed that. I placed a feature request through them. I think they initially planned that as a characteristic for kinesis in combination with lambda because the documentstion stated this in the beginning but was changed at some point. So for us the best option at this point was to change the way we insert data because we relied on the order within the pipeline. We moved that logic to the data producers making the piepline order agnostic which means we can increase the number of shards without any problems.oneschilling

1 Answers

2
votes

Why would you care which Lambda instance processes a shard? Lambda instances have no state anyway so it shouldn't matter which instance reads which shard. What's more important is that at any time a Lambda instance will only read from ONE shard. After its finished invocation it may read from another shard.