11
votes

I have created a subscription filter in CloudWatch log group and made it stream to my lambda function, but am getting an error in my lambda function.

Code:

import boto3
import binascii
import json
import base64
import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    foo=''
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            foo += rv
    return foo

def lambda_handler(event, context):
    # Decode and decompress the AWS Log stream to extract json object
    stream=json.dumps(event['awslogs']['data'])
    f = base64.b64decode(stream)
    payload=json.loads(stream_gzip_decompress(f.decode(f)))
    print(payload)

Error:

Response:

{
  "errorMessage": "decode() argument 1 must be str, not bytes",
  "errorType": "TypeError",
  "stackTrace": [
    [
      "/var/task/lambda_function.py",
      34,
      "lambda_handler",
      "payload=json.loads(stream_gzip_decompress(f.decode(f)))"
    ]
  ]
}

Any help or clue would be greatly appreciated! If you have any alternative solution please suggest. My requirement is to handle logs from CloudWatch using lambda.

Thanks in Advance !!

3
Your failure is beacuse you are trying to json.dumps(event['awslogs']['data']) where data is the base 64 encoded gzip-compressed list data. just pass the event['awslogs']['data'] data straight to decode as in P. Ryan's answertheannouncer

3 Answers

39
votes

In case anyone else is looking for help with this topic.

I took a slightly different approach, but I did see an 'awslog' key in the event.

Here is a sample that I was successful with. Python 3.6 Lambda. Setup cloudwatch trigger to call the lambda

import gzip
import json
import base64


def lambda_handler(event, context):
    print(f'Logging Event: {event}')
    print(f"Awslog: {event['awslogs']}")
    cw_data = event['awslogs']['data']
    print(f'data: {cw_data}')
    print(f'type: {type(cw_data)}')
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    payload = json.loads(uncompressed_payload)

    log_events = payload['logEvents']
    for log_event in log_events:
        print(f'LogEvent: {log_event}')
4
votes

Below is the outline I normally follow when processing CloudWatch Logs being sent to AWS Lambda.

import gzip
import json
from StringIO import StringIO

def lambda_handler(event, context):
    cw_data = str(event['awslogs']['data'])
    cw_logs = gzip.GzipFile(fileobj=StringIO(cw_data.decode('base64', 'strict'))).read()
    log_events = json.loads(cw_logs)
    for log_event in logevents['logEvents']:
        # Process Logs

I see that you are treating the data sent to the AWS Lambda as a JSON object. You first want to base64 decode then unzip the data. After decoding and decompressing you should have the JSON object with the log information.

0
votes

Here is quasar's answer converted to Python 3.

import gzip
import json
import base64
from io import BytesIO

cw_data = str(event['awslogs']['data'])
cw_logs = gzip.GzipFile(fileobj=BytesIO(base64.b64decode(cw_data, validate=True))).read()
log_events = json.loads(cw_logs)
for log_event in log_events['logEvents']:
    # Process Logs

The main change is using io.BytesIO and a different base64 decode function to get to the log event data.