In a stream processing problem, we have 3 sensors that each generate a timestamped sample every 8 milliseconds (Sensor's times are synchronized). So I want to merge data for every timestamp (In case of 3 sensors we should output 3 merged sample data for every timestamp). In addition, we have a time limit of 160 milliseconds such that every data should be outputted at most after 160 milliseconds since it's generated timestamp. So I decided to use Flink EventTime concept and a time window. Because timestamp is unique in every sensor's sample, we consider it as key for datastream.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(160)) {
@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(160))
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do merging samples);
In the code, we first introduce the second field of the stream as an event time and set a periodic watermark (since data are generated at a fixed rate with a fixed delay). After that, we set the event time as key for the stream. we want to gather late data, too so we used sideOutputLateData
. At last, we reduce (merge) data with the same key. The problem is in the Flink event time mode the defined window doesn't output any data! Without setting event time it will output data but I want to use event time as a time for Flin windowing. I tried multiple time for window and watermark but they didn't output anything.
I solved the same problem using Flink count window and a customized timeout trigger successfully.
Update: The incoming datastream format is in the type (If we had 3 sensor):
sensor_id, timestamp, data
(1, 1531980773390, {})
(2, 1531980773390, {})
(3, 1531980773390, {})
(1, 1531980773398, {})
(2, 1531980773398, {})
(3, 1531980773398, {})
and so on for every 8 milliseconds.
Saving late data in a datastream
final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data") {
};
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);