4
votes

The AWS docs makes it look as if you can specify DataFormatConversionConfiguration for a AWS::KinesisFirehose::DeliveryStream in cloudformation, but is not documentation on where the property is supposed to go. Tried adding it under Properties, ExtendedS3DestinationConfiguration, ProcessingConfiguration, and one of the Processors. Each time, CF complains with-

The following resource(s) failed to update: [EventFirehose]. 12:24:32 UTC-0500

UPDATE_FAILED AWS::KinesisFirehose::DeliveryStream EventFirehose Encountered unsupported property DataFormatConversionConfiguration

There own docs say-

If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate.

What am I doing wrong?

2

2 Answers

11
votes

As per SDK documentation, it should be inside ExtendedS3DestinationConfiguration or ExtendedS3DestinationUpdate. However, the cloudformation is currently NOT supporting this property as per this docs. This is a very common discrepancy between cloudformation and other AWS services. One similar issue is mentioned here AWS ECS: Severe bug in ECS Service Cloudformation template (which got resolved recently).

For the time being, you can update via SDK or wait for some time for cloudformation to catch up.

If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate

Edit

As of June 2019, the DataFormatConversionConfiguration property is in available in CloudFormation. See change log: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/ReleaseHistory.html

4
votes

Here how I have solved this task. Firehose strem writes data to S3 bucket in parquet format:

  LogsCollectionDatabase:
    Type: AWS::Glue::Database
    Properties:
      DatabaseInput:
        Description: Database for Kinesis Analytics
        Name: !Ref DatabaseName
      CatalogId: !Ref AWS::AccountId

  LogsCollectionTable:
    Type: AWS::Glue::Table
    DependsOn: LogsCollectionDatabase
    Properties:
      DatabaseName: !Ref LogsCollectionDatabase
      CatalogId: !Ref AWS::AccountId
      TableInput:
        Name: serverlogs
        Description: Table for storing logs from kinesis
        TableType: EXTERNAL_TABLE
        StorageDescriptor:
          Columns:
            - Type: string
              Name: col1
            - Type: string
              Name: col2
          Location: !Sub s3://${DestinationBucketName}/${DestinationBucketPrefix}
          InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
          SerdeInfo:
            SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn: FirehoseDeliveryIAMPolicy
    Properties:
      DeliveryStreamName: !Ref RegionalStreamName
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !Ref DestinationBucketArn
        Prefix: !Ref DestinationBucketPrefix
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 64
        ErrorOutputPrefix: errors/
        RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              HiveJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
            DatabaseName: !Ref LogsCollectionDatabase
            TableName: !Ref LogsCollectionTable
            Region: !Ref AWS::Region
            VersionId: LATEST

Of course, need to define IAM role & policy for FirehosStream