We are trying to create billing reports based on Kinesis Analytics. We have a Kinesis Stream that gathers information about Lambda calls, DynamoDB writes and reads etc. This data is processed in the Kinesis Analytics Application with this SQL Statement:
CREATE OR REPLACE STREAM "DESTINATION_MINUTE_STEAM" ("RowTime" timestamp, "CustomerId" varchar(128), "MandantenId" integer,"TotalCalls" integer,"TotalCost" double, "TypeReport" varchar(32));
CREATE OR REPLACE PUMP "STREAM_PUMP_MINUTE" AS INSERT INTO "DESTINATION_MINUTE_STEAM"
SELECT STREAM "ROWTIME" as "RowTime","CustomerId","MandantenId", COUNT(*) AS "TotalCalls", SUM("OverallCost") "TotalCost", 'Minute' as "TypeReport"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "CustomerId","MandantenId", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
This creates a destination stream that should write the data every 60 seconds (based on the documentation of tumbling windows). For the destination we use a Lambda function to write the data into DynamoDB (this is also recommended by the documentation). Now we were expecting Lambda functions minute based, but our CloudWatch log shows that it is called every few seconds.
We don't have any other streams, don't have any other Lambda functions that call this function, don't have other analytic apps, no triggers.
Here is part of the template.yaml:
RTCKINESISINVOICECONSUMER:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: bin/
Handler: billinghandler
Runtime: go1.x
Tracing: Active # https://docs.aws.amazon.com/lambda/latest/dg/lambda-x-ray.html
Policies:
- AmazonDynamoDBFullAccess
- AmazonKinesisFullAccess
Environment:
Variables:
DISABLE_SSL: !Ref LambdaDisableSSL
KinAnalyticsApp:
Type: AWS::KinesisAnalytics::Application
Properties:
ApplicationName: "RealtimeBillingAnalytics"
ApplicationDescription: "Sample Kin App"
ApplicationCode: !Ref KinesisSqlCode
Inputs:
- NamePrefix: "SOURCE_SQL_STREAM"
InputSchema:
RecordColumns:
- Name: "RecordId"
SqlType: "VARCHAR(128)"
Mapping: "$.RecordId"
- Name: "MandantenId"
SqlType: "Integer"
Mapping: "$.MandantenId"
- Name: "CustomerId"
SqlType: "VARCHAR(128)"
Mapping: "$.CustomerId"
- Name: "OverallCost"
SqlType: "Real"
Mapping: "$.OverallCost"
RecordFormat:
RecordFormatType: "JSON"
MappingParameters:
JSONMappingParameters:
RecordRowPath: "$"
KinesisStreamsInput:
ResourceARN: !GetAtt KinInputStream.Arn
RoleARN: !GetAtt KinesisAnalyticsRole.Arn
KinInputStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
KinesisAnalyticsRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: kinesisanalytics.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: Open
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: "*"
Resource: "*"
KinAnalyticsAppOutputs:
Type: AWS::KinesisAnalytics::ApplicationOutput
DependsOn: KinAnalyticsApp
Properties:
ApplicationName: !Ref KinAnalyticsApp
Output:
Name: "DESTINATION_MINUTE_STEAM"
DestinationSchema:
RecordFormatType: "JSON"
LambdaOutput:
ResourceARN: !GetAtt RTCKINESISINVOICECONSUMER.Arn
RoleARN: !GetAtt KinesisAnalyticsRole.Arn
Does anybody know why the Kinesis Analytics app emits too often?