14
votes

Is there are way to push data from a Lambda function to a Kinesis stream? I have searched the internet but have not found any examples related to it.

Thanks.

3

3 Answers

14
votes

Yes, you can send information from Lambda to Kinesis Stream and it is very simple to do. Make sure you are running Lambda with the right permissions.

  1. Create a file called kinesis.js, This file will provide a 'save' function that receives a payload and sends it to the Kinesis Stream. We want to be able to include this 'save' function anywhere we want to send data to the stream. Code:

const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
  apiVersion: kinesisConstant.API_VERSION, //optional
  //accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
  //secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
  region: kinesisConstant.REGION
});

const savePayload = (payload) => {
//We can only save strings into the streams
  if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
    try {
      payload = JSON.stringify(payload);
    } catch (e) {
      console.log(e);
    }
  }

  let params = {
    Data: payload,
    PartitionKey: kinesisConstant.PARTITION_KEY,
    StreamName: kinesisConstant.STREAM_NAME
  };

  kinesis.putRecord(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else     console.log('Record added:',data);
  });
};

exports.save = (payload) => {
  const params = {
    StreamName: kinesisConstant.STREAM_NAME,
  };

  kinesis.describeStream(params, function(err, data) {
    if (err) console.log(err, err.stack);
    else {
      //Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
      if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
        || data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
        savePayload(payload);
      } else {
        console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
        console.log(`Record Lost`, JSON.parse(payload));
      }
    }
  });
};
  1. Create a kinesisConstant.js file to keep it consistent :)

module.exports = {
  STATE: {
    ACTIVE: 'ACTIVE',
    UPDATING: 'UPDATING',
    CREATING: 'CREATING',
    DELETING: 'DELETING'
  },
  STREAM_NAME: '<your-stream-name>',
  PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
  PAYLOAD_TYPE: 'String',
  REGION: '<the-region-where-you-have-lambda-and-kinesis>',
  API_VERSION: '2013-12-02'
}
  1. Your handler file: we added the 'done' function to send a response to whoever wants to send the data to the stream but 'kinesis.save(event)' does all the work.

const kinesis = require('./kinesis');

exports.handler = (event, context, callback) => {
  console.log('LOADING handler');
  
  const done = (err, res) => callback(null, {
    statusCode: err ? '400' : '200',
    body: err || res,
    headers: {
      'Content-Type': 'application/json',
    },
  });
  
  kinesis.save(event); // here we send it to the stream
  done(null, event);
}
4
votes

This should be done exactly like doing it on your computer.

Here's an example in nodejs:

let aws = require('aws');
let kinesis = new aws.Kinesis();

// data that you'd like to send
let data_object = { "some": "properties" };
let data = JSON.stringify(data_object);

// push data to kinesis
const params = {
  Data: data,
  PartitionKey: "1",
  StreamName: "stream name"
}

kinesis.putRecord(params, (err, data) => {
  if (err) console.error(err);
  else console.log("data sent");
}

Please note, this piece of code will not work, as the Lambda has no permissions to your stream. When accessing AWS resources through Lambda, it is better to use IAM roles;

  1. When configuring a new Lambda, you can choose existing / create a role.
  2. Go to IAM, then Roles, and pick the role name you assigned to the Lambda function.
  3. Add the relevant permissions (putRecord, putRecords).

Then, test the Lambda.

1
votes

Yes, this can be done, I was trying to accomplish the same thing and was able to do so in Lambda using Node.js 4.3 runtime, and it also works in version 6.10.

Here is the code:

Declare the following at the top of your Lambda function:

var AWS = require("aws-sdk");
var kinesis = new AWS.Kinesis();
function writeKinesis(rawdata){
    data = JSON.stringify(rawdata);
    params = {Data: data, PartitionKey: "<PARTITION_KEY>", StreamName: "<STREAM_NAME>"};
    kinesis.putRecord(params, (err, data) => {
    if (err) console.error(err);
    else console.log("data sent");
    });  
}

Now, in the exports.handler, call the function:

writeKinesis(<YOUR_DATA>);

A few things to note... for Kinesis to ingest data, it must be encoded. In the example below, I have function that takes logs from CloudWatch, and sends them over to a Kinesis stream.

Note that I'm inserting the contents of buffer.toString('utf8') into the writeKinesis function:

exports.handler = function(input, context) {
    ...
    var zippedInput = new Buffer(input.awslogs.data, 'base64');
    zlib.gunzip(zippedInput, function(error, buffer) {
        ...
        writeKinesis(buffer.toString('utf8'));
        ...
    }
    ...
}

Finally, in IAM, configure the appropriate permissions. Your Lambda function has to run within the context of an IAM role that includes the following permissions below. In my case, I just modified the default lambda_elasticsearch_execution role to include a policy called "lambda_kinesis_execution" with the following code:

"Effect": "Allow",
"Action": [
    "kinesis:*"
],
"Resource": [
    "<YOUR_STREAM_ARN>"
]