0
votes

I wanna calculate a window based average (or any other function defined by me) in Flink with a stream based on historical events, so the stream has to be Event-Times (not processing time based):

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

I have found out how to add the timestamp at the ingestion:

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)

But when I do the calculation (an apply function) it does not work when I just do it in the same way as I did without EventTime. I have read something about a Watermark which I have to set:

val avg = stream
  .keyBy("instrument")
  .timeWindow(Time.seconds(10))
  .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
    val avg = values.map(_.val).sum / values.size
    val dp = Datapoint(key.getField[String](0), avg)
    out.collect(dp)
  })

avg.print()
env.execute()

Has someone a simple Scala example for that?

Regards,
Andreas

1

1 Answers

0
votes

A watermark is an effectively a timestamp with an assertion that all events with earlier timestamps have (probably) already arrived. Windows based on event time depend on watermarks to know when a window is complete. By far the most common watermarking strategy is to assume that events arrive with some bounded delay.

If you want to emit watermarks in the data source (during ingestion), see Source Functions with Timestamps and Watermarks, but it would be as simple as

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime))

If, on the other hand, you want to deal with this outside the source, see Timestamp Assigners / Watermark Generators and Assigners allowing a fixed amount of lateness. You could simply do something like this:

stream
  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))( _.getTimestamp ))
  .keyBy("instrument")
  ...

The documentation I've linked to has more detailed examples in Scala.