0
votes

I'm trying Flink for basic aggregation of (sorted) timestamped events loaded from a CSV file.

I tell Flink to use Event Time:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

and then I use a time window on a KeyedStream

val distances = signals
  .assignAscendingTimestamps(_.ts)
  .map(s => (s.mmsi, s.ts, getPortDistance(s)))
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .sum(2).print()

The problem is that changing the window to like 10 minutes actually prints results after that amount of time has passed!

My understanding was that by explicitly telling Flink to use the timestamp field as the Event Time, the operation wouldn't be dependant on real time on the machine. Am I missing something here?

1

1 Answers

0
votes

Firstly you have to understand the watermark and how to generate a watermark.

What is watermark?

In general a watermark is a declaration that by that point in the stream, all events up to a certain timestamp should have arrived. Once a watermark reaches an operator, the operator can advance its internal event time clock to the value of the watermark. For more details, please look into the official documents.

How to generate watermark?

Because you call the assignAscendingTimestamps function, which means that your watermark is (latest received element's timestamp - 1). Therefore, you're getting an ascending watermark and unable to retreive the out-of-order elements.

How to solve this?

Define your own watermark timestamp assginer. You can take a look at the detailed implementation of the "assignAscendingTimestamps" and try to write your own.