I wanna calculate a window based average (or any other function defined by me) in Flink with a stream based on historical events, so the stream has to be Event-Times (not processing time based):
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
I have found out how to add the timestamp at the ingestion:
ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)
But when I do the calculation (an apply function) it does not work when I just do it in the same way as I did without EventTime. I have read something about a Watermark which I have to set:
val avg = stream
.keyBy("instrument")
.timeWindow(Time.seconds(10))
.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
val avg = values.map(_.val).sum / values.size
val dp = Datapoint(key.getField[String](0), avg)
out.collect(dp)
})
avg.print()
env.execute()
Has someone a simple Scala example for that?
Regards,
Andreas