0
votes

There are a lot of late events in my Flink job so set allowedLateness() to 10mins (using TumblingEventTimeWindows and a complex AggregateFunction runs on every window)

Seems the aggregation happens on every late event but I'd like to fire less frequently.

  • Is there any trigger which fires only in every minute?
  • Do the triggers affect to late events?
  • Are there any triggers which effect only to the late events?
3

3 Answers

1
votes

You could implement a custom Trigger with whatever behavior you desire.

If you look at the implementation of EventTimeTrigger, the default trigger for tumbling event time windows,

public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

you'll see that whenever an event is assigned to the window after the stream's watermark has reached or surpassed the window's end, then the trigger returns FIRE. This is why every late event causes another firing.

An alternative would be to have no allowed lateness, but instead collect the late events into their own stream (using a side output), and then process the late events independently.

0
votes

Just to clarify, the late events I mention below, are those late events still in the range of allowlatenes you set.

Is there any trigger which fires only in every minute ?

No. However you can customize your own Trigger, try using the event timer service to achieve that.

Do the triggers affect to late events ?

Yes. The late events will be referenced in trigger by calling onElement function.

Are there any triggers which effect only to the late events ?

You can filter the late events in custom trigger like this:

if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    return TriggerResult.FIRE;
0
votes

In my opinion the best approach is to implement processes function with custom watermark.