2
votes

I'm trying to build a centralized logging solution using Cloudwatch Subscription Filters to write logs to Kinesis Firehose -> S3 -> AWS Glue -> Athena. I'm running into a lot of issues with data formatting.

Initially, I was using AWS::KinesisFirehose's S3DestinationConfiguration to write to S3 and then trying to either crawl the data with AWS::Glue::Crawler or create the table manually in the Cloudformation template. I found the Crawler had a lot of trouble determining the data format on S3 (found ION instead of JSON - ION can't be queried by Athena). I'm now trying ExtendedS3DestinationConfiguration which allows explicit configuration of input and output formats to force it to parquet.

Unfortunately, using this setup Kinesis Firehose returns error logs saying the input is not valid JSON. This makes me wonder if the Cloudwatch Subscription Filter is not writing proper JSON - but there are no configuration options on this object to control the data format.

This is not a particularly unusual problem statement so somebody out there must have a proper configuration. Here are some snippets of my failing configuration:

ExtendedS3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${S3Bucket}
        Prefix: !Sub ${S3LogsPath}year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: !Sub ${FailedWritePath}
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${AppId}-logstream-${Environment}
          LogStreamName: logs
        CompressionFormat: UNCOMPRESSED
        RoleARN: !GetAtt FirehoseRole.Arn
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              OpenXJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            DatabaseName: !Ref CentralizedLoggingDatabase
            Region: !Ref AWS::Region
            RoleARN: !GetAtt FirehoseRole.Arn
            TableName: !Ref LogsGlueTable
            VersionId: LATEST

Former config:

S3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${S3Bucket}
        Prefix: !Sub ${S3LogsPath}year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: !Sub ${FailedWritePath}
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${AppId}-logstream-${Environment}
          LogStreamName: logs
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseRole.Arn

And crawler:

Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub ${DNSEndPoint}_logging_s3_crawler_${Environment}
      DatabaseName: !Ref CentralizedLoggingDatabase
      Description: AWS Glue crawler to crawl logs on S3
      Role: !GetAtt CentralizedLoggingGlueRole.Arn
#      Schedule: ## run on demand
#        ScheduleExpression: cron(40 * * * ? *)
      Targets:
        S3Targets:
          - Path: !Sub s3://${S3Bucket}/${S3LogsPath}
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG
      TablePrefix: !Sub ${AppId}_${Environment}_

The error, using ExtendedS3DestinationConfiguration:

"attemptsMade":1,"arrivalTimestamp":1582650068665,"lastErrorCode":"DataFormatConversion.ParseError","lastErrorMessage":"Encountered malformed JSON. Illegal character ((CTRL-CHAR, code 31)): only regular white space (\r, \n, \t) is allowed between tokens\n at [Source: com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream@2ce955fc; line: 1, column: 2]

Seems like there is some configuration issue here but I cannot find it.

1
might be worth having the logs go to a preprocess step in lambda where you can verify the format before sending it to firehose - Banjo Obayomi
Hi, did you ever manage to resolve this? I'm starting with your initial FH config. Apparently the json gets sent base64 encoded, and apparently Athena can read this by default. In still having a hell of a job getting Athena table to output legible data though ! - SimonB

1 Answers

2
votes

So I've just been through this in a similar scenario, but now have it working.

Firehose writes the logs to S3 compressed Base64, and as an array of JSON records. For Athena to read the data, it needs to be decompressed and 1 JSON record per line.

So create a lambda function from the blueprint : kinesis-firehose-cloudwatch-logs-processor Enable Transformations in your Firehose, and specify the above lambda function. That will decompress, and put the json to S3 1 record per line.

Creating the Athena table:

CREATE EXTERNAL TABLE mydb.mytable(
  eventversion string COMMENT 'from deserializer', 
  useridentity struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT 'from deserializer', 
  eventtime string COMMENT 'from deserializer', 
  eventsource string COMMENT 'from deserializer', 
  eventname string COMMENT 'from deserializer', 
  awsregion string COMMENT 'from deserializer', 
  sourceipaddress string COMMENT 'from deserializer', 
  useragent string COMMENT 'from deserializer', 
  errorcode string COMMENT 'from deserializer', 
  errormessage string COMMENT 'from deserializer', 
  requestparameters string COMMENT 'from deserializer', 
  responseelements string COMMENT 'from deserializer', 
  additionaleventdata string COMMENT 'from deserializer', 
  requestid string COMMENT 'from deserializer', 
  eventid string COMMENT 'from deserializer', 
  resources array<struct<arn:string,accountid:string,type:string>> COMMENT 'from deserializer', 
  eventtype string COMMENT 'from deserializer', 
  apiversion string COMMENT 'from deserializer', 
  readonly string COMMENT 'from deserializer', 
  recipientaccountid string COMMENT 'from deserializer', 
  serviceeventdetails string COMMENT 'from deserializer', 
  sharedeventid string COMMENT 'from deserializer', 
  vpcendpointid string COMMENT 'from deserializer', 
  managementevent boolean COMMENT 'from deserializer', 
  eventcategory string COMMENT 'from deserializer')
PARTITIONED BY ( 
  datehour string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='awsRegion,eventCategory,eventID,eventName,eventSource,eventTime,eventType,eventVersion,managementEvent,readOnly,recipientAccountId,requestID,requestParameters,responseElements,sourceIPAddress,userAgent,userIdentity') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://mybucket/prefix'
TBLPROPERTIES (
  'projection.datehour.format'='yyyy/MM/dd/HH', 
  'projection.datehour.interval'='1', 
  'projection.datehour.interval.unit'='HOURS', 
  'projection.datehour.range'='2021/01/01/00,NOW', 
  'projection.datehour.type'='date', 
  'projection.enabled'='true', 
  'storage.location.template'='s3://mybucket/myprefix/${datehour}'
)