1
votes

I am experimentiing with Kinesis and Lambda.

I can't see the delay in "GetRecords.IteratorAge" metric of Kinesis even though it is obviously delaying.

The environment of experiment is following.

  • Kinesis data streams : 1 stream consisting of 1 shard without enhanced fanout.
  • Producer : It runs the following producer.rb at local PC. It puts a record each second.
  • Consumer : The following lambda_handler.rb executes in Lambda. It simply put the records with timestamp to DynamoDB table, and sleep 3 seconds at each record.
  • Trigger setting:
    • Batch size: 50
    • Batch window: None
    • Concurrent batches per shard: 1
    • Last processing result: No records processed
    • Maximum age of record: 604800
    • Retry attempts: 10000
    • Split batch on error: No

producer.rb

require 'aws-sdk'

kinesis = Aws::Kinesis::Client.new(region: 'ap-northeast-1')

COUNT = 300
STREAM_NAME = 'test_stream'
PKEY = 'client-001'

COUNT.times do |i|
  kinesis.put_record(
    stream_name: STREAM_NAME,
    data: (i+1).to_s,
    partition_key: PKEY
  )
  sleep 1
end

lambda_handler.rb

require 'json'
require 'aws-sdk'
require 'base64'

def lambda_handler(event:, context:)
  dynamoDB = Aws::DynamoDB::Resource.new(region: 'ap-northeast-1')
  table = dynamoDB.table(ENV['DYNAMODB_TABLE'])
  item = {
    'aws_request_id' => context.aws_request_id,
    'start' => Time.now.to_s
  }
  event['Records'].each do { sleep 3 }
  item['end'] = Time.now.to_s
  table.put_item({item: item})
  { statusCode: 200 }
end

The result was look like this in DynamoDB and the metrics in Cloudwatch was look like this:

It processed records between 04:09:03 and 04:24:04. Why "GetRecords.IteratorAge" doesn't increase even if the record processing doesn't progress?

enter image description here

enter image description here

1

1 Answers

0
votes

This question was self resolved.

https://youtu.be/xmacMfbrG28

This video provided a detailed explanation of the internal structure of stream source processiung ofLambda.

"Poller" subscribes to shards and fetch the records from shard iterator by GetRecords, then "Poller" invokes frontend function and pass its records. So there was no delay in GetRecords even if Lambda function is delayed.