0
votes

We are trying to create billing reports based on Kinesis Analytics. We have a Kinesis Stream that gathers information about Lambda calls, DynamoDB writes and reads etc. This data is processed in the Kinesis Analytics Application with this SQL Statement:

CREATE OR REPLACE STREAM "DESTINATION_MINUTE_STEAM" ("RowTime" timestamp, "CustomerId" varchar(128), "MandantenId" integer,"TotalCalls" integer,"TotalCost" double, "TypeReport" varchar(32));
CREATE OR REPLACE  PUMP "STREAM_PUMP_MINUTE" AS INSERT INTO "DESTINATION_MINUTE_STEAM"
SELECT STREAM "ROWTIME" as "RowTime","CustomerId","MandantenId", COUNT(*) AS "TotalCalls", SUM("OverallCost") "TotalCost",  'Minute' as "TypeReport"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "CustomerId","MandantenId", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);

This creates a destination stream that should write the data every 60 seconds (based on the documentation of tumbling windows). For the destination we use a Lambda function to write the data into DynamoDB (this is also recommended by the documentation). Now we were expecting Lambda functions minute based, but our CloudWatch log shows that it is called every few seconds.

We don't have any other streams, don't have any other Lambda functions that call this function, don't have other analytic apps, no triggers.

Here is part of the template.yaml:

RTCKINESISINVOICECONSUMER:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: bin/ 
      Handler: billinghandler
      Runtime: go1.x
      Tracing: Active # https://docs.aws.amazon.com/lambda/latest/dg/lambda-x-ray.html
      Policies:
        - AmazonDynamoDBFullAccess
        - AmazonKinesisFullAccess
      Environment:
        Variables:
          DISABLE_SSL: !Ref LambdaDisableSSL 

  KinAnalyticsApp:
    Type: AWS::KinesisAnalytics::Application
    Properties:
      ApplicationName: "RealtimeBillingAnalytics"
      ApplicationDescription: "Sample Kin App"
      ApplicationCode: !Ref KinesisSqlCode
      Inputs:
        - NamePrefix: "SOURCE_SQL_STREAM"
          InputSchema:
            RecordColumns:
             - Name: "RecordId"
               SqlType: "VARCHAR(128)"
               Mapping: "$.RecordId"
             - Name: "MandantenId"
               SqlType: "Integer"
               Mapping: "$.MandantenId"
             - Name: "CustomerId"
               SqlType: "VARCHAR(128)"
               Mapping: "$.CustomerId"
             - Name: "OverallCost"
               SqlType: "Real"
               Mapping: "$.OverallCost"
            RecordFormat:
              RecordFormatType: "JSON"
              MappingParameters:
                JSONMappingParameters:
                  RecordRowPath: "$"
          KinesisStreamsInput:
            ResourceARN: !GetAtt KinInputStream.Arn
            RoleARN: !GetAtt KinesisAnalyticsRole.Arn
  KinInputStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  KinesisAnalyticsRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: kinesisanalytics.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: Open
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "*"
                Resource: "*"
  KinAnalyticsAppOutputs:
    Type: AWS::KinesisAnalytics::ApplicationOutput
    DependsOn: KinAnalyticsApp
    Properties:
      ApplicationName: !Ref KinAnalyticsApp
      Output:
        Name: "DESTINATION_MINUTE_STEAM"
        DestinationSchema:
          RecordFormatType: "JSON"
        LambdaOutput:
          ResourceARN: !GetAtt RTCKINESISINVOICECONSUMER.Arn
          RoleARN: !GetAtt KinesisAnalyticsRole.Arn

Does anybody know why the Kinesis Analytics app emits too often?

1

1 Answers

1
votes

I think maybe your lambda doesn't return status 'OK' as acknowledgement, so Kinesis analytics treat it as failure. Then it keep sending the failure records. Please check the section 'Record Response Model' under this page. https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda.html

Each record sent to your Lambda as an output function (with record IDs) must be acknowledged with either Ok or DeliveryFailed, and it must contain the following parameters. Otherwise, Kinesis Data Analytics treats them as a delivery failure.

The status of the delivery of the record. The following are possible values:

Ok: The record was transformed successfully and sent to the final destination. Kinesis Data Analytics ingests the record for SQL processing.

DeliveryFailed: The record was not delivered successfully to the final destination by the Lambda as output function. Kinesis Data Analytics continuously retries sending the delivery failed records to the Lambda as output function.

BTW, you can get lambda example here. https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda-functions.html#how-it-works-lambda-dest-python