1
votes

I am trying to read values from a kafka topic (AWS MSK) using AWS lambda.

The event record when printed from lambda looks like this:

{'eventSource': 'aws:kafka', 'eventSourceArn': 'arn:aws:kafka:ap-northeast-1:987654321:cluster/mskcluster/79y80c66-813a-4f-af0e-4ea47ba107e6', 'records': {'Transactions-0': [{'topic': 'Transactions', 'partition': 0, 'offset': 4798, 'timestamp': 1603565835915, 'timestampType': 'CREATE_TIME', 'value': 'eyJFdmVudFRpbWUiOiAiMjAyMC0xMC0yNCAxODo1NzoxNS45MTUzMjQiLCAiSVAiOiAiMTgwLjI0MS4xNTkuMjE4IiwgIkFjY291bnROdW1iZXIiOiwiMTQ2ODA4ODYiLCAiVXNlck5hbWUiOi67iQW1iZXIgUm9tYXJvIiwgIkFtb3VudCI6ICI1NTYyIiwgIlRyYW5zYWN0aW9uSUQiOiAiTzI4Qlg3TlBJbWZmSXExWCIsICJDb3VuTHJ5IjogIk9tYW4ifQ=='}]}}

How can I extract the 'topic' and 'value' fields? The value one is base64 encoded. I get the following error:

NameError: name 'record' is not defined

I am trying the following code:

import json
import base64

def lambda_handler(event, context):
    print(event)
    message = event['records']
    payload=base64.b64decode(record["message"]["value"])
    print("Decoded payload: " + str(payload))

Sample MSK event structure

1

1 Answers

3
votes

In your code snippet the record variable you try to pass to the decode function does not exist. An example to iterate over the records is:

records = event['records']['Transactions-0']
for record in records:
    payload=base64.b64decode(record["message"]["value"])
    print("Decoded payload: " + str(payload))

Every function call contains multiple records per topic. Though you could also iterate over those if you have multiple like Transactions-1,...