2
votes

I want to transform the AWS kinesis stream data using lambda function and then deliver to S3 using AWS firehose. However, I always encountered this problem: errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."

This is the lambda_function.


import base64
import json

def lambda_handler(event, context):
    output = []
    for record in event['Records']:
        # your own business logic.
        json_object = {"name": "this is a test"}
        output_record = {
            'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right. 
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['Records'])))
    return {'records': output}

A related question was posted here. Kinesis Firehose lambda transformation. But it seems the kinesis data format is different from what I got. Noticed that the events I got are like the following, it is Capital Records, not records. And there is no recordId, but it is eventID.

{
    'Records': [
        {
            'kinesis': {
                'kinesisSchemaVersion': '1.0', 
                'partitionKey': '1', 
                'sequenceNumber': '49603262076998903856573762341186472148109820820203765762', 
                'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=', 
                'approximateArrivalTimestamp': 1596314234.567
            }, 
            'eventSource': 'aws:kinesis', 
            'eventVersion': '1.0', 
            'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762', 
            'eventName': 'aws:kinesis:record', 
            'invokeIdentityArn':'xxx', 
            'awsRegion': 'us-east-1', 
            'eventSourceARN': 'xxx'
        }
    ]
}

1

1 Answers

3
votes

It depends upon how you've configured your Kinesis, Firehose and Lambda pipeline.

If your Kinesis stream triggers a Lambda to delivers the data to Firehose, then you'll be interested in Kinesis Record Event. Checkout Using AWS Lambda with Amazon Kinesis. Sample event below

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

Another setup could be Firehose polling the Kinesis stream. Also, we get the flexibility to setup a transformation Lambda for Firehose (Amazon Kinesis Data Firehose Data Transformation). In this setup sample event will be as follows (Using AWS Lambda with Amazon Kinesis Data Firehose)

{
  "invocationId": "invoked123",
  "deliveryStreamArn": "aws:lambda:events",
  "region": "us-west-2",
  "records": [
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record1",
      "approximateArrivalTimestamp": 1510772160000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000000",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
        "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
        "subsequenceNumber": ""
      }
    },
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record2",
      "approximateArrivalTimestamp": 151077216000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000001",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
        "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
        "subsequenceNumber": ""
      }
    }
  ]
}
  1. The Kinesis Firehose lambda transformation question seems to be concerned with the second type of setup.
  2. Your data pipeline seems to be using the first type of setup.