I want to transform the AWS kinesis stream data using lambda function and then deliver to S3 using AWS firehose. However, I always encountered this problem: errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."
This is the lambda_function.
import base64
import json
def lambda_handler(event, context):
output = []
for record in event['Records']:
# your own business logic.
json_object = {"name": "this is a test"}
output_record = {
'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right.
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['Records'])))
return {'records': output}
A related question was posted here. Kinesis Firehose lambda transformation. But it seems the kinesis data format is different from what I got. Noticed that the events I got are like the following, it is Capital Records, not records. And there is no recordId, but it is eventID.
{
'Records': [
{
'kinesis': {
'kinesisSchemaVersion': '1.0',
'partitionKey': '1',
'sequenceNumber': '49603262076998903856573762341186472148109820820203765762',
'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=',
'approximateArrivalTimestamp': 1596314234.567
},
'eventSource': 'aws:kinesis',
'eventVersion': '1.0',
'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762',
'eventName': 'aws:kinesis:record',
'invokeIdentityArn':'xxx',
'awsRegion': 'us-east-1',
'eventSourceARN': 'xxx'
}
]
}