I am trying to write a Flink program to process a Kinesis Stream. The Kinesis stream comes from AWS DynamoDB stream and represents inserts made in DynamoDB table.
Each record in the Stream can contain multiple insert records. The number of insert records can be variable ( can vary from 1 to 10)
I want to group all the insert records from all the streams within a interval of 1 min and sum the impression count (impressionCount) field
[
{
"country":"NL",
"userOS":"mac",
"createdOn":"2017-08-02 16:22:17.135600",
"trafficType":"D",
"affiliateId":"87",
"placement":"4",
"offerId":"999",
"advertiserId":"139",
"impressionCount":"1",
"uniqueOfferCount":"0"
},
{
"country":"NL",
"userOS":"mac",
"createdOn":"2017-08-02 16:22:17.135600",
"trafficType":"D",
"affiliateId":"85",
"placement":"4",
"offerId":"688",
"advertiserId":"139",
"impressionCount":"1",
"uniqueOfferCount":"0"
}
]
My code:
DataStream<List> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
"Impressions-Stream", new RawImpressionLogSchema(), consumerConfig));
/** CLASS: RawImpressionLogSchema **/
public class RawImpressionLogSchema implements DeserializationSchema<List> {
@Override
public List<RawImpressionLogRecord> deserialize(byte[] bytes) {
return RawImpressionLogRecord.parseImpressionLog(bytes);
}
@Override
public boolean isEndOfStream(List event) {
return false;
}
@Override
public TypeInformation<List> getProducedType() {
return TypeExtractor.getForClass(List.class);
}
}
/** parse Method **/
public static List<RawImpressionLogRecord> parseImpressionLog(
byte[] impressionLogBytes) {
JsonReader jsonReader = new JsonReader(new InputStreamReader(
new ByteArrayInputStream(impressionLogBytes)));
JsonElement jsonElement = Streams.parse(jsonReader);
if (jsonElement == null) {
throw new IllegalArgumentException(
"Event does not define a eventName field: "
+ new String(impressionLogBytes));
} else {
Type listType = new TypeToken<ArrayList<RawImpressionLogRecord>>(){}.getType();
return gson.fromJson(jsonElement, listType);
}
}
I was able to parse the input and create the kinesisStream. Wanted to know is it the correct way ? and how do I achieve the aggregation.
Also once I have the DataStream, how can I apply the map/filter/group by function on List Stream.
I am new to Flink and any help would be appreciated.
Update
Tried to come with the below code to solve the above use case. But somehow the reduce function is not getting called. Any idea what is wrong in the below code ?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<List<ImpressionLogRecord>> rawRecords = env.addSource(new ImpressionLogDataSourceFunction("C:\\LogFiles\\input.txt"));
DataStream<ImpressionLogRecord> impressionLogDataStream = rawRecords
.flatMap(new Splitter())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ImpressionLogRecord>(Time.seconds(5)) {
@Override
public long extractTimestamp(
ImpressionLogRecord element) {
return element.getCreatedOn().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli();
}
}
);
//impressionLogDataStream.print();
KeyedStream<ImpressionLogRecord, String> keyedImpressionLogDataStream = impressionLogDataStream
.keyBy(impressionLogRecordForKey -> {
StringBuffer groupByKey = new StringBuffer();
groupByKey.append(impressionLogRecordForKey.getCreatedOn().toString().substring(0, 16));
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getOfferId());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getAdvertiserId());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getAffiliateId());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getCountry());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getPlacement());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getTrafficType());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getUserOS());
System.out.println("Call to Group By Function===================" + groupByKey);
return groupByKey.toString();
});
//keyedImpressionLogDataStream.print();
DataStream<ImpressionLogRecord> aggImpressionRecord = keyedImpressionLogDataStream
.timeWindow(Time.minutes(5))
.reduce((prevLogRecord, currentLogRecord) -> {
System.out.println("Calling Reduce Function-------------------------");
ImpressionLogRecord aggregatedImpressionLog = new ImpressionLogRecord();
aggregatedImpressionLog.setOfferId(prevLogRecord.getOfferId());
aggregatedImpressionLog.setCreatedOn(prevLogRecord.getCreatedOn().truncatedTo(ChronoUnit.MINUTES));
aggregatedImpressionLog.setAdvertiserId(prevLogRecord.getAdvertiserId());
aggregatedImpressionLog.setAffiliateId(prevLogRecord.getAffiliateId());
aggregatedImpressionLog.setCountry(prevLogRecord.getCountry());
aggregatedImpressionLog.setPlacement(prevLogRecord.getPlacement());
aggregatedImpressionLog.setTrafficType(prevLogRecord.getTrafficType());
aggregatedImpressionLog.setUserOS(prevLogRecord.getUserOS());
aggregatedImpressionLog.setImpressionCount(prevLogRecord.getImpressionCount() + currentLogRecord.getImpressionCount());
aggregatedImpressionLog.setUniqueOfferCount(prevLogRecord.getUniqueOfferCount() + currentLogRecord.getUniqueOfferCount());
return aggregatedImpressionLog;
});
aggImpressionRecord.print();