1
votes

This is a followup question to: Trigger when State expires

I'm storing state of each incoming element in the stream and after the timer goes off, I remove the state. This is so that I can prevent duplicates from being processed until the element has timed out after which I can process the same element again. I

I've written the following code to test timers but it seems that the timer is triggered after all 3 elements have gone through the first ProcessFunction.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    streamEnv.setParallelism(12);

    List<Tuple2<String, String>> inputList = new ArrayList<>();
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));

    streamEnv.fromCollection(inputList).keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Integer> occur;

                @Override
                public void open(Configuration parameters) throws Exception {
                    occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (occur.value() < 2) {
                        occur.update(occur.value() + 1);
                        out.collect(value);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                    }
                    else {
                        Thread.sleep(10000);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                        out.collect(value);
                    }
                }
            })
            .keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Tuple2<String, String>> storedTuple;

                @Override
                public void open(Configuration parameters) throws Exception {
                    storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple",
                            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    Tuple2<String, String> stored = storedTuple.value();
                    if (stored == null) {
                        LOGGER.info("[TEST] Storing Tuple {}", value);
                        storedTuple.update(value);
                        out.collect(value);
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000);
                    }
                }
            }

            @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value());
                    storedTuple.clear();
                }    
            )
            .addSink(new CollectSink());

    streamEnv.execute("Testing");
    for (Tuple2<String, String> tup: CollectSink.values) {
        System.out.println(tup);
    }

}

private static class CollectSink implements SinkFunction<Tuple2<String, String>> {

    static final List<Tuple2<String, String>> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Tuple2<String, String> value) throws Exception {
        values.add(value);
    }
}

I have an input list with 3 duplicate elements. In the first ProcessFunction I send the first two elements as is but delay the 3rd element by 10 seconds.

In the second ProcessFunction it filters the element based on whether the state is stored for it or not. As expected, the first element gets stored and sent onwards and the second element isn't as the state already exists. For the first element, apart from sending it on, I also set a timer for 6 seconds so that the state is cleared after the timer is triggered.

Now the third element is sent on after 10 seconds which means that the 6 second trigger should have already cleared the state. However, the third element is also being processed before the timer is triggered. I can also see the output as containing only 1 copy of the Tuple even though I'm expected 2 copies.

I've added some logging to give a better idea of the execution times.

[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Storing Tuple (Test,test)
[2019-02-19 14:11:58,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Removing Tuple (Test,test)

You can see that the first two tuples are emitted together as expected followed by a 10 second delay after which the 3rd tuple is emitted. Now the Removing Tuple occurs after the 10 seconds even though it was triggered to occur after 6 seconds of the first tuple coming in.

2

2 Answers

7
votes

The event-time timer won't fire until a Watermark greater than the time specified in the timer is processed. Such a watermark can't occur until after the third event has been processed. Furthermore, with ingestion time, watermarks are generated using a periodic watermark generator, and by default are inserted into the stream every 200 msec.

0
votes

NOTE: Before Flink 1.4.0, when called from a processing-time timer, the ProcessFunction.onTimer() method sets the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it’s harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we’ve decided to fix it. Upon upgrading to 1.4.0, Flink jobs that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html