19
votes

I am trying to configure a Kinesis Analytics application with the following settings:

  • Input stream is a Kinesis Firehose which is taking stringified JSON values
  • The SQL is a simple passthrough (it needs to be more complicated later but for testing, it just sends the data through)
  • Output stream is a second Kinesis Firehose which delivers records to an S3 bucket

Later down the line, I will import the contents of the S3 bucket using Hive + JSONSERDE which expects each JSON record to live on its own line. The Firehose output just appends all of the JSON records which breaks JSONSERDE.

I could attach an AWS Lambda data formatter to the output stream but that seems expensive. All I want is to split each record using a newline.

If I was doing without an Analytics app I would append the newline to each Firehose record. It seems strange that there is no way to do that in the app's SQL:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";

Is the best answer to add the Lambda data formatter? I'd really like to avoid this.

3

3 Answers

3
votes

I had a similar requirement to add new lines to the firehose generated files, In our application firehose is invoked via API Gateway.

This is specified in the Body Mapping Templates under Integration Request section.

The following command in the API Gateway generates new lines to the kinesis firehose records.

Method 1 :

    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }

This works perfectly if you are invoking firehose via API Gateway.

Thanks & Regards, Srivignesh KN

2
votes

Solution using Python or Node.js

I am using DynamoDB Streams and I needed to get those records saved into S3. I implemented a Kinesis Firehose stream along with a Lambda function. This worked for getting my records into S3 as JSON strings, however, every record that was saved to the file in S3 was inline, that is, in one single continuous row and so I needed to add a new line at the end of each record that was added so that each record was on its own line. For my solution, I ended up having to do some base64 decoding/encoding.

Here is how I did it:

  1. When you create your Kinesis Firehose stream, enable "Transform
    source records with AWS Lambda" (select "Enabled"). If you have already created your stream, you can still enable this feature by editing your existing stream.
  2. At this point you will need to select another Lambda function that performs this transformation. In my case, I needed to add a new line at the end of each record so that when I open the file up in a text editor and view it, every entry is on a separate line.

Below is the tested solution code for both Python and Node.js that I used for that second Lambda:

Python solution to add a newline:

import json
import boto3
import base64

output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        print('payload:', payload)
        
        row_w_newline = payload + "\n"
        print('row_w_newline type:', type(row_w_newline))
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}

Node.js solution to add a newline:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {

   
    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        
        let entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let result = entry + "\n"
        const payload = (new Buffer(result, 'utf8')).toString('base64');
            
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
            
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

Some good references that helped me piece the Python version together:

In the original question up above, MrHen wanted to do this without using a second Lambda. I was able to get this working in the first Lambda, rather than using the Kinesis Firehose transform source records feature. I did this by taking the newImage from DynamoDB and doing, in this order: encode, decode, add new line ("\n"), encode, decode. There's probably a much cleaner way. I chose to go with the transform source records feature using the second Lambda function as it seems cleaner to me at this time.

In my case, the single Lambda solution looked like this:

 # Not pretty, but it works! Successfully adds new line to record.
 # newImage comes from the DynamoDB Stream as a Python dictionary object,
 # I convert it to a string before running the code below.

    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
    newImage = newImage + "\n"
    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
1
votes

A basic example here in the way which we implemented. We used javascript to put records into Kinesis Stream and used Firehose to redirect into s3 location with gzip compression. Later athena will query from the s3 location for fetching records from s3.

Below Code for adding new Line before sending to Kinesis Stream using javascript code.

var payload = JSON.parse(payload);  
finalData = JSON.stringify(payload)+"\n";

var kinesisPayload = {};    
kinesisPayload.Data = finalData;    
kinesisPayload.StreamName = "kinesisStreamName");    
kinesisPayload.PartitionKey = "124";