I have a continuing JSONArray data produced to Kafka topic,and I wanna process records with EventTime characteristic.In order to reach this goal,I have to assign watermark to each record which contained in the JSONArray.
I didn't find a convenience way to achieve this goal.My solution is consuming data from DataStreamSource> ,then iterate List and collect Object to downstream with an anonymous ProcessFunction,finally assign watermark to the this downstream.
The major code shows below:
DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
.process(new ProcessFunction<List<MockData>, MockData>() {
@Override
public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
throws Exception {
value.forEach(mockData -> out.collect(mockData));
}
});
convertToPojo.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
@Override
public long extractTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
The code seems all right without doubt,running without error as well.But ProcessWindowFunction never triggered.I tracked the Flink source code,find EventTimeTrigger never returns TriggerResult.FIRE,causing by TriggerContext.getCurrentWatermark returns Long.MIN_VALUE all the time.
What's the proper way to process List in eventtime?Any suggestion will be appreciated.