I am having quite hard time to understand flink windowing principals and would be very pleased if you could point me in the right direction.
My purpose is to count the number of recurring events for a time interval and generate alert events if the number of recurring events is greater than a threshold.
As I understand, windowing is a perfect match for this scenario.
Additional requirement is to generate an early alert if recurring events count in a window is 2 (i.e. alert should be generated without waiting window end).
I thought that an alert event generating process window function can be used to aggregate windowed events and a custom trigger can be used to emit early results from the window based on the recurring events count (before the watermark reaches the window’s end timestamp).
I am using event-time semantics and having problems/questions for the custom trigger .
You can find the actual implementation in the gist: https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36
I am using keyed state to keep track of element count in the window encounteredElementsCountState
Upon receiving first element I register EventTimeTimer
to the window end. This is supposed to trigger FIRE_AND_PURGE
for window closing and working as expected.
If the count exceeds threshold , I try to trigger early fire. This also seems to be successful, processwindow
function is called immediately after this firing.
The problem is, I had to insert below check to the code without understanding the reason. Because the previously collected elements were again supplied to onElement
method:
if (ctx.getCurrentWatermark() < 0) {
logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
return TriggerResult.CONTINUE;
}
I could not figure out the reason. What I see is that when this happens the watermark value is (ctx.getCurrentWatermark()) Long.MIN_VALUE
(that leaded to the above check). How can this happen?
This check seems to avoid duplicate early event generation, but I do not know why this happens and is this workaround is appropriate.
Could you please advice why the same elements are processed twice in the window?
Another question is about the keyed state usage. Does this implementation leaks any state after window is disposed? I am trying to clear all used states in clear method of the trigger but would that be enough?
Regards.