2
votes

I'm struggling a bit understanding how Flink Triggers work. My datastream contains events with a sessionId that I aggregated based on that sessionId. Each session will contain a Started and a Ended event however some times the Ended event will be lost.

In order to handle this I've set up a Trigger that will emit the aggregated session whenever the ended event is processed. But in the case that no events arrive from that session for 2 minutes I want to emit whatever we have aggregated so far (our apps that send the events send heartbeats every minute so if we don't get any events the session is considered lost).

I've set up the following trigger function:

public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
    private final long sessionTimeout;
    private long lastSetTimer;

    // Max session length set to 1 day
    public static final long MAX_SESSION_LENGTH = 1000l * 86400l;

    // End session events
    private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
            .add("Playback.Aborted")
            .add("Playback.Completed")
            .add("Playback.Error")
            .add("Playback.StartAirplay")
            .add("Playback.StartCasting")
            .build();

    public EventTimeProcessingTimeTrigger(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
        ctx.registerProcessingTimeTimer(lastSetTimer);

        if(endSession.contains(element.get(Field.EVENT_TYPE))) {
            return TriggerResult.FIRE_AND_PURGE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE_AND_PURGE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(lastSetTimer);
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
    }
}

In order to set watermarks for the events I use the watermarks set by the apps since appEventTime might not be the same as wallClock on the server. I extract watermarks like this:

DataStream<HashMap> playerEvents = env
                .addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
                .name("Read player events from Kafka")
                .uid("Read player events from Kafka")
                .map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
                .name("Map Json to HashMap")
                .uid("Map Json to HashMap")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
                {
                    @Override
                    public long extractTimestamp(HashMap element)
                    {
                        long timestamp = 0L;
                        Object timestampAsObject = (Object) element.get("CanonicalTime");
                        timestamp = (long)timestampAsObject;
                        return timestamp;
                    }
                })
                .name("Add CanonicalTime as timestamp")
                .uid("Add CanonicalTime as timestamp");

Now what I find strange is that when I run the code in debug and set a breakpoint in the clear function of the Trigger it constantly gets called. Even when no FIRE_AND_PURGE point is reached in the Trigger. So it feels like I've completely misunderstood how the Trigger is supposed to work. And that my implementation is not at all doing what I think it's doing.

I guess my question is, when should clear be called by the Trigger? And is this the correct way to implement a combined EventTimeTrigger and ProcessingTimeTrigger?

Thankful for all the help I can get.

UPDATE 1: (2020-05-29)

In order to provide some more information about how things are setup. I set up my environment as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

So I use EventTime for the entire stream. I then create the windows like this:

DataStream<PlayerSession> playerSessions = sideEvents
                .keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
                .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                .trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
                .aggregate(new SessionAggregator())
                .name("Aggregate events into sessions")
                .uid("Aggregate events into sessions");
2
It would be helpful to see more context for how the Trigger is being used -- I'd like to see whether you are setting the time characteristic, whether this is a keyed window or a windowall, how the window assigner is constructed, etc.David Anderson
What kind of windows are these -- are you using Flink's session windows?David Anderson
@DavidAnderson Thanks for taking a look. I've updated the original post with more information about how I set up the stream and how I use the windows.Tim Josefsson

2 Answers

2
votes

This situation is complex. I hesitate to predict exactly what this code will do, but I can explain some of what’s going on.

Point 1: you have set the time characteristic to event time, arranged for timestamps and watermarks, and implemented an onEventTime callback in your Trigger. But nowhere are you creating an event time timer. Unless I've missed something, nothing is actually using event time or watermarks. You haven't implemented an event time trigger, and I would not expect that onEventTime will ever be called.

Point 2: Your trigger doesn't need to call clear. Flink takes care of calling clear on triggers as part of purging windows.

Point 3: Your trigger is trying to fire and purge the window repeatedly, which doesn't seem right. I say this because you are creating a new processing time timer for every element, and when each timer fires, you are firing and purging the window. You can fire the window as often as you like, but you can only purge the window once, after which it is gone.

Point 4: Session windows are a special kind of window, known as merging windows. When sessions merge (which happens all the time, as events arrive), their triggers are merged, and one of them gets cleared. This is why you see clear being called so frequently.

Suggestion: since you have once-a-minute keepalives, and intend to close sessions after 2 minutes of inactivity, it seems like you could set the session gap to be 2 minutes, and that would avoid a fair bit of what's making things so complex. Let the session windows do what they're designed to do.

Assuming that will work, then you could simple extend Flink's ProcessingTimeTrigger and override its onElement method to do this:

@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

    if (endSession.contains(element.get(Field.EVENT_TYPE))) {
        return TriggerResult.FIRE_AND_PURGE;
    }

    return super(element, timestamp, window, ctx);
}

In this fashion the window will be triggered after two minutes of inactivity, or by an explicit session-ending event.

You should be able to simply inherit the rest of ProcessingTimeTrigger's behavior.

If you want to use event time, then use EventTimeTrigger as the superclass, and you'll have to find a way to make sure that your watermarks make progress even when the stream becomes idle. See this answer for how to handle that.

0
votes

same problem
I have set the time characteristic to proccessing time and the trigger :

//the trigger  

.trigger(PurgingTrigger.of(TimerTrigger.of(Time.seconds(winSec))))

the following trigger function:

//override the ProcessingTimeTrigger behavior
public class TimerTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc;

    private TimerTrigger(long winInterValMills) { //window
        this.stateDesc = new ReducingStateDescriptor("fire-time", new TimerTrigger.Min(), LongSerializer.INSTANCE);
        this.interval = winInterValMills;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        }
        long now = System.currentTimeMillis();
        ReducingState<Long> fireTimestamp = (ReducingState) ctx.getPartitionedState(this.stateDesc);
        if (fireTimestamp.get() == null) {
            long time = Math.max(timestamp, window.maxTimestamp()) + interval;
            if (now-window.maxTimestamp()>interval){ // fire late
                time = (now-now%1000) + interval-1;
            }
            ctx.registerProcessingTimeTimer(time);
            fireTimestamp.add(time);
            return TriggerResult.CONTINUE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        if (time == window.maxTimestamp()){  
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = (ReducingState) ctx.getPartitionedState(this.stateDesc);
        if (((Long) fireTimestamp.get()).equals(time)) {
            fireTimestamp.clear();
            long maxTimestamp = Math.max(window.maxTimestamp(), time); //maybe useless
            if (maxTimestamp == time) {
                maxTimestamp = time + this.interval;
            }
            fireTimestamp.add(maxTimestamp);
            ctx.registerProcessingTimeTimer(maxTimestamp);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public void clear(W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = (ReducingState) ctx.getPartitionedState(this.stateDesc);
        long timestamp = (Long) fireTimestamp.get();
        ctx.deleteProcessingTimeTimer(timestamp);
        fireTimestamp.clear();
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, OnMergeContext ctx) {
        ctx.mergePartitionedState(this.stateDesc);
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public String toString() {
        return "TimerTrigger(" + this.interval + ")";
    }

    public static <W extends Window> TimerTrigger<W> of(Time interval) {
        return new TimerTrigger(interval.toMilliseconds());
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}