2
votes

I am trying to write a Flink program to process a Kinesis Stream. The Kinesis stream comes from AWS DynamoDB stream and represents inserts made in DynamoDB table.

Each record in the Stream can contain multiple insert records. The number of insert records can be variable ( can vary from 1 to 10)

I want to group all the insert records from all the streams within a interval of 1 min and sum the impression count (impressionCount) field

[
    {
        "country":"NL",
        "userOS":"mac",
        "createdOn":"2017-08-02 16:22:17.135600",
        "trafficType":"D",
        "affiliateId":"87",
        "placement":"4",
        "offerId":"999",
        "advertiserId":"139",
        "impressionCount":"1",
        "uniqueOfferCount":"0"
    },
    {
        "country":"NL",
        "userOS":"mac",
        "createdOn":"2017-08-02 16:22:17.135600",
        "trafficType":"D",
        "affiliateId":"85",
        "placement":"4",
        "offerId":"688",
        "advertiserId":"139",
        "impressionCount":"1",
        "uniqueOfferCount":"0"
    }
]

My code:

DataStream<List> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
          "Impressions-Stream", new RawImpressionLogSchema(), consumerConfig));

/** CLASS: RawImpressionLogSchema **/
public class RawImpressionLogSchema implements DeserializationSchema<List> {

    @Override
    public List<RawImpressionLogRecord> deserialize(byte[] bytes) {
        return RawImpressionLogRecord.parseImpressionLog(bytes);
    }

    @Override
    public boolean isEndOfStream(List event) {
        return false;
    }

    @Override
    public TypeInformation<List> getProducedType() {
        return TypeExtractor.getForClass(List.class);
    }

}

/** parse Method **/      
public static List<RawImpressionLogRecord> parseImpressionLog(
        byte[] impressionLogBytes) {

    JsonReader jsonReader = new JsonReader(new InputStreamReader(
            new ByteArrayInputStream(impressionLogBytes)));

    JsonElement jsonElement = Streams.parse(jsonReader);

    if (jsonElement == null) {
        throw new IllegalArgumentException(
                "Event does not define a eventName field: "
                        + new String(impressionLogBytes));
    } else {
        Type listType = new TypeToken<ArrayList<RawImpressionLogRecord>>(){}.getType();
        return gson.fromJson(jsonElement, listType);
    }

}

I was able to parse the input and create the kinesisStream. Wanted to know is it the correct way ? and how do I achieve the aggregation.

Also once I have the DataStream, how can I apply the map/filter/group by function on List Stream.

I am new to Flink and any help would be appreciated.

Update

Tried to come with the below code to solve the above use case. But somehow the reduce function is not getting called. Any idea what is wrong in the below code ?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<List<ImpressionLogRecord>> rawRecords = env.addSource(new ImpressionLogDataSourceFunction("C:\\LogFiles\\input.txt"));

DataStream<ImpressionLogRecord> impressionLogDataStream = rawRecords
        .flatMap(new Splitter())
        .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<ImpressionLogRecord>(Time.seconds(5)) {

                    @Override
                    public long extractTimestamp(
                            ImpressionLogRecord element) {
                        return element.getCreatedOn().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli();
                    }
                }
        );

//impressionLogDataStream.print();

KeyedStream<ImpressionLogRecord, String> keyedImpressionLogDataStream = impressionLogDataStream

            .keyBy(impressionLogRecordForKey -> {
                StringBuffer groupByKey = new StringBuffer();
                groupByKey.append(impressionLogRecordForKey.getCreatedOn().toString().substring(0, 16));
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getOfferId());
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getAdvertiserId());
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getAffiliateId());
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getCountry());
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getPlacement());
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getTrafficType());
                groupByKey.append("_");
                groupByKey.append(impressionLogRecordForKey.getUserOS());
                System.out.println("Call to Group By Function===================" + groupByKey);
                return groupByKey.toString();
            });

//keyedImpressionLogDataStream.print();

DataStream<ImpressionLogRecord> aggImpressionRecord = keyedImpressionLogDataStream

        .timeWindow(Time.minutes(5))
        .reduce((prevLogRecord, currentLogRecord) -> {

                System.out.println("Calling Reduce Function-------------------------");
                ImpressionLogRecord aggregatedImpressionLog = new ImpressionLogRecord();
                aggregatedImpressionLog.setOfferId(prevLogRecord.getOfferId());
                aggregatedImpressionLog.setCreatedOn(prevLogRecord.getCreatedOn().truncatedTo(ChronoUnit.MINUTES));
                aggregatedImpressionLog.setAdvertiserId(prevLogRecord.getAdvertiserId());
                aggregatedImpressionLog.setAffiliateId(prevLogRecord.getAffiliateId());
                aggregatedImpressionLog.setCountry(prevLogRecord.getCountry());
                aggregatedImpressionLog.setPlacement(prevLogRecord.getPlacement());
                aggregatedImpressionLog.setTrafficType(prevLogRecord.getTrafficType());
                aggregatedImpressionLog.setUserOS(prevLogRecord.getUserOS());
                aggregatedImpressionLog.setImpressionCount(prevLogRecord.getImpressionCount() + currentLogRecord.getImpressionCount());
                aggregatedImpressionLog.setUniqueOfferCount(prevLogRecord.getUniqueOfferCount() + currentLogRecord.getUniqueOfferCount());

                return aggregatedImpressionLog;
            });

aggImpressionRecord.print();
1
This sound like a good use case for Kinesis Analytics. Not quite the answer you are looking for though. - JaredHatfield
Had heard lot about apache flink for big data processing of streaming data. So that was my primary reason for chosing it. Maybe Kinesis analytics can fit my use case, but in future I also want to use machine learning. Read that flink has those libraries. - AWS Enthusiastic

1 Answers

2
votes

Working Code

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<List<ImpressionLogRecord>> rawRecords = env.addSource(new ImpressionLogDataSourceFunction("C:\\LogFiles\\input.txt"));

    //This method converts the DataStream of List<ImpressionLogRecords> into a single stream of ImpressionLogRecords.
    //Also assigns timestamp to each record in the stream
    DataStream<ImpressionLogRecord> impressionLogDataStream = rawRecords
            .flatMap(new RecordSplitter())
            .assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor<ImpressionLogRecord>(Time.seconds(5)) {
                        @Override
                        public long extractTimestamp(
                                ImpressionLogRecord element) {
                            return element.getCreatedOn().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli();
                        }
                    }
            );

    //This method groups the records in the stream by a user defined key.
    KeyedStream<ImpressionLogRecord, String> keyedImpressionLogDataStream = impressionLogDataStream

                .keyBy(impressionLogRecordForKey -> {
                    StringBuffer groupByKey = new StringBuffer();
                    groupByKey.append(impressionLogRecordForKey.getCreatedOn().toString().substring(0, 16));
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getOfferId());
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getAdvertiserId());
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getAffiliateId());
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getCountry());
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getPlacement());
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getTrafficType());
                    groupByKey.append("_");
                    groupByKey.append(impressionLogRecordForKey.getUserOS());
                    return groupByKey.toString();
                });

    //This method aggregates the grouped records every 1 min and calculates the sum of impression count and unique offer count.
    DataStream<ImpressionLogRecord> aggImpressionRecord = keyedImpressionLogDataStream

            .timeWindow(Time.minutes(1))
            .reduce((prevLogRecord, currentLogRecord) -> {
                    ImpressionLogRecord aggregatedImpressionLog = new ImpressionLogRecord();
                    aggregatedImpressionLog.setOfferId(prevLogRecord.getOfferId());
                    aggregatedImpressionLog.setCreatedOn(prevLogRecord.getCreatedOn().truncatedTo(ChronoUnit.MINUTES));
                    aggregatedImpressionLog.setAdvertiserId(prevLogRecord.getAdvertiserId());
                    aggregatedImpressionLog.setAffiliateId(prevLogRecord.getAffiliateId());
                    aggregatedImpressionLog.setCountry(prevLogRecord.getCountry());
                    aggregatedImpressionLog.setPlacement(prevLogRecord.getPlacement());
                    aggregatedImpressionLog.setTrafficType(prevLogRecord.getTrafficType());
                    aggregatedImpressionLog.setUserOS(prevLogRecord.getUserOS());
                    aggregatedImpressionLog.setImpressionCount(prevLogRecord.getImpressionCount() + currentLogRecord.getImpressionCount());
                    aggregatedImpressionLog.setUniqueOfferCount(prevLogRecord.getUniqueOfferCount() + currentLogRecord.getUniqueOfferCount());
                    return aggregatedImpressionLog;
                });

    aggImpressionRecord.print();

    aggImpressionRecord.addSink(new ImpressionLogDataSink());


    env.execute();

}

public static class RecordSplitter
        implements
            FlatMapFunction<List<ImpressionLogRecord>, ImpressionLogRecord> {
    @Override
    public void flatMap(List<ImpressionLogRecord> rawImpressionRecords,
            Collector<ImpressionLogRecord> impressionLogRecordCollector)
            throws Exception {
        for (int i = 0; i < rawImpressionRecords.size(); i++) {
            impressionLogRecordCollector.collect(rawImpressionRecords.get(i));
        }

    }
}`enter code here`