0
votes

My Flink job has to compute a certain aggregation after each working shift. Shifts are configurable and look something like:

1st shift: 00:00am - 06:00am
2nd shift: 06:00am - 12:00pm
3rd shift: 12:00pm - 18:00pm

Shifts are the same every day for operational purposes, there is no distinction between days of the week/year. The shifts configuration can vary over time and can be non-monotonous, so this leaves out of the table a trivial EventTime window like: TumblingEventTimeWindows.of(Time.of(6, HOURS)) as some of the shifts might be shrunk or spanned overtime, or a couple hours break in between might be inserted...

I have come up with something based on a GlobalWindow and a custom Trigger:

LinkedList<Shift> shifts;

datastream.windowAll(GlobalWindows.create())
          .trigger(ShiftTrigger.create(shifts))
          .aggregate(myAggregateFunction)

where in my custom trigger I attempt to discern if an incoming event passes the end time of the on-going working shift, and fire the window for the shift:

@Override
public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
    // compute the end time of the on-going shift
    final Instant currentShiftEnd = ...
    // fire window for the shift if the event passes the end line
    if (ShiftPredicate.of(currentShiftEnd).test(element)) {
        return TriggerResult.FIRE_AND_PURGE;
    }
    return TriggerResult.CONTINUE;
}

Omitting the code for state management and some memoization optimizations, this seems to be working fine in a streaming use case: the first event coming in after a shift endtime, triggers the firing and the aggregation for the last shift.

However the job can be run bounded for date parameters (eg: for reprocessing past periods), or be shutdown prematurely for a set of expected reasons. When this sort of thing happens, I observe that the last window is not fired/flushed,

ie: the last shift of the day ends at midnight, and right over should start the 1st shift of the next day. An event comes at 23:59pm and the shift is about to end. However, the job is just running for the day of today, and at 00:00 it finishes. Since no new element arrived to the custom trigger passing the line to trigger the window firing, the aggregation for the last shift is not calculated, however, some partial results are still expected, even if nothing is happening in the next shift or the job terminates in the middle of the on-going shift.

I've read that the reason for this is:

Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners)

I have taken a look inside the org.apache.flink.streaming.api.windowing package to look for something like a TumblingEventTimeWindows or DynamicEventTimeSessionWindows that I could use or extend with an end hour of the day, so that I can rely on the default event-time trigger of these firing when the watermark of the element passes the window limit, but I'm not sure how to do it. Intuitively I'd wish for something like:

shifts.forEach(shift -> {
    datastream.windowAll(EventTimeWindow.fromTo(DAILY, shift.startTime, shift.endTime))
              .aggregate(myAggregateFunction);
});

I know for use cases of arbitrary complexity, what some people do is ditching the Windows API in detriment of low-level process functions, where they "manually" compute the window by holding elements as managed state of the operator, while at given rules or conditions they fit and extract results from a defined aggregate function or accumulator. Also in a process function, is possible to pin point any pending calculations by tapping into the onClose hook.

Would there be a way to get this concept of recurrent event time windows for certain hours of a day every day by extending any of the objects in the Windows API?

1
I've expanded my answer a little bit, but it's not clear where you are stuck.David Anderson

1 Answers

1
votes

If I understand correctly, there are two separate questions/issues here to resolve:

  1. How to handle not having uniform window boundaries.
  2. How to terminate the job without losing the results of the last window.

For (1), your approach of using GlobalWindows with a custom ShiftTrigger is one way to go. If you'd like to explore an alternative that uses a process function, I've written an example that you will find in the Flink docs.

For a more fluent API, you could create a custom WindowAssigner, which could then leverage the built-in EventTimeTrigger as its default trigger. To do this, you'll need to implement the WindowAssigner interface.

For (2), so long as you are relying on event time processing, the last set of windows won't be triggered unless a Watermark large enough to close them arrives before the job is terminated. This normally requires that you have an event whose timestamp is sufficiently after the window's end that a Watermark large enough to trigger the window is created (and that the job stays running long enough for that to happen).

However, when Flink is aware that a streaming job is coming to a natural end, it will automatically inject a Watermark with its timestamp set to MAX_WATERMARK, which has the effect of triggering all event time timers, and closing all event time windows. This happens automatically for any bounded sources. With Kafka (for example), you can also arrange for this by having your deserializer return true from isEndOfStream.

Another way to handle this is to avoid canceling such jobs when they are done, but to instead use ./bin/flink stop --drain [-p savepointPath] <jobID> to cleanly stop the job (with a savepoint), while draining all remaining window results (which it does by injecting one last big watermark (MAX_WATERMARK)).