0
votes

I am building a Flink streaming application and would prefer to use event-time because it will ensure that all of the timers that are set will trigger deterministically given a failure or replay of historical data. The problem with event-time is that time only moves forward if events come in. Our data sources (physical sensors) sometimes produce very little data, so there are times where a single data point might open a five minute aggregation window, but the next data point is 20 minutes later, so the window closes and emits an output record very late.

Our proposed solution to this was to use an AWS lambda function that is scheduled to run every X minutes that outputs a dummy event into our Kinesis stream that Flink reads from, thus forcing a watermark to be generated that advances time forward.

My concern is that this only works if watermarks are truly global, meaning that a SINGLE heartbeat message can lead to the creation of a watermark that advances the event time of every single operator/task in a Flink application that uses data that originated from this stream. The documentation has led me to believe that Flink parellizes reading from a source, where each parallel read operator generates its own watermarks and then a downstream operator, say a window, takes the minimum of the various watermarks it has seen. If this is the case, this seems problematic to me because a dummy heartbeat event would be needed per parallel watermark generator, but I do not get control over which nodes read my heartbeat messages out of the stream.

So, my question is, how exactly do downstream operators use watermarks to advance event-time and could a single dummy message be added to the kinesis stream to advance event-time across the ENTIRE Flink application?

If not, how can I force event time to move forward?

1
To be more specific, my application reads from a single stream. From that stream, it takes the events, deserializes them, assigns timestamps and generates watermarks, keys them by device id, and then sends them to a ProcessFunction(). I want to be able to send in a single heartbeat messages that creates a watermark that advances event time for each KeyedProcessFunction to ensure that the timers I register in the KeyedProcessFunction are not delayed behind processing time too much. This heartbeat message would essentially have an event_timestamp, but its device_id field would be null.ChrisATX

1 Answers

2
votes

You're right; there is an issue here. The standard periodic watermark generator implemented by BoundedOutOfOrdernessTimestampExtractor depends on seeing new events with larger timestamps in order to advance the watermark.

There are several ways you might address this:

  1. Run the source and the watermark assigner in a task running with a parallelism of one (and then increase the parallelism for the rest of the pipeline, if you want to). That way a single heartbeat message will suffice.

  2. Broadcast the heartbeat messages. That way every parallel instance will receive them and they can all advance their watermarks.

  3. Instead of heartbeat messages, implement a watermark generator that uses a processing time timer to artificially advance the watermark despite the lack of incoming events. See https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java for an example.

Note that this third approach is less desirable because it creates a coupling with processing time that eliminates some of the core advantages of a pure event time approach.

If you use a heartbeat source, you'll want to implement a watermark generator for the other (sometimes idle) source that returns MAX_WATERMARK. Otherwise the watermarks from this stream will hold back the overall watermark.

Also, an AWS Lambda feels like overkill. You could implement a simple custom Flink source to create the heartbeat events.