0
votes

I am trying to run a basic join over Flink by joining two DataStreams on local. The datatype of the source streams are same(Tuple4(String, String, Long, Long)). After running the below mentioned function multiple times, I received two different outputs on random basis (Stored in variable CollectTuple2Sink below, DEBUG logs for the same are mentioned below). I tried keeping parallelism 1 and max parallelism 1 but issue still persists.

//Basic Function
    public void runBasicJoin() throws Exception {

        TumblingEventTimeWindows tsAssigner;
        //tried with getExecutionEnvironment as well
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        env.setMaxParallelism(1);
        //class declared below
        CollectTuple2Sink.VALUES.clear();

        Tuple4<String, String, Long, Long> input1 =
                new Tuple4<String, String, Long, Long>("key1", "val1", 1L, t(1));
        Tuple4<String, String, Long, Long> input2 =
                new Tuple4<String, String, Long, Long>("key1", "val2", 12L, t(2));
        Tuple4<String, String, Long, Long> input3 =
                new Tuple4<String, String, Long, Long>("key1", "val3", 3L, t(3));
        Tuple4<String, String, Long, Long> input4 =
                new Tuple4<String, String, Long, Long>("key2", "val4", 18L, t(4));
        Tuple4<String, String, Long, Long> input5 =
                new Tuple4<String, String, Long, Long>("key1", "val5", 11L, t(6));
        Tuple4<String, String, Long, Long> input6 =
                new Tuple4<String, String, Long, Long>("key1", "val6", -121L, t(7));
        Tuple4<String, String, Long, Long> input7 =
                new Tuple4<String, String, Long, Long>("key2", "val7", -111L, t(8));
        Tuple4<String, String, Long, Long> input8 =
                new Tuple4<String, String, Long, Long>("key2", "val8", 111L, t(9));

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream1 = env.addSource(new Tuple4Soruce(
                t(0), input1, input2, input3, input4,t(5),
                input5, input6, input7, input8,t(10)
        ));

        dataStream1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream2 = env.addSource(new Tuple4Soruce(
                t(0), input1, input3,input3,input4,input4,input4,t(5),
                 input5,input6, t(10),t(11)
        ));

        dataStream2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        tsAssigner = TumblingEventTimeWindows.of(Time.minutes(5));

        dataStream1.join(dataStream2)
                .where(new Tuple4KeySelector())
                .equalTo(new Tuple4KeySelector())
                .window(tsAssigner)
                .trigger(EventTimeTrigger.create())
                .evictor(CountEvictor.of(2))
                .apply(new Tuple4JoinFunction())
                .addSink(new CollectTuple2Sink());
        env.execute();
        System.out.println(CollectTuple2Sink.VALUES);

    }

    private static class CollectTuple2Sink
            implements SinkFunction<Tuple2<String, Long>> {

        public static final List<Tuple2<String, Long>> VALUES = new ArrayList<>();

        @Override
        public synchronized void invoke(Tuple2<String, Long> value)
                throws Exception {
            VALUES.add(value);
        }

    }
//join function ---> Takes the 2nd and 4th field of a tuple and convert tuple4 to tuple2
    private static class Tuple4JoinFunction implements JoinFunction<Tuple4<String, String, Long, Long>, Tuple4<String, String, Long, Long>, Tuple2<String, Long>> {
        @Override
        public Tuple2<String, Long> join(Tuple4<String, String, Long, Long> tuple4, Tuple4<String, String, Long, Long> tuple42) throws Exception {
            return new Tuple2<>(tuple4.f1, tuple4.f3);
        }
    }
//key selector --> select the 2nd value of tuple 4
    private static class Tuple4KeySelector implements KeySelector<Tuple4<String, String, Long, Long>, String> {
        @Override
        public String getKey(Tuple4<String, String, Long, Long> tuple4) throws Exception {
            return tuple4.f1;
        }
    }

//source function --> generates a sequence input for tuple4
    private static class Tuple4Soruce
            implements SourceFunction, ResultTypeQueryable<Tuple4<String, String, Long, Long>> {
        private volatile boolean running = true;
        private Object[] testStream;
        private TypeInformation<Tuple4<String, String, Long, Long>> typeInformation =
                TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>() {
                });


        Tuple4Soruce(Object... eventsOrWatermarks) {
            this.testStream = eventsOrWatermarks;
        }

        @Override
        public void run(SourceContext ctx) throws Exception {
            for (int i = 0; (i < testStream.length) && running; i++) {
                if (testStream[i] instanceof Tuple4) {
                    Tuple4<String, String, Long, Long> tuple =
                            (Tuple4<String, String, Long, Long>) testStream[i];
                    ctx.collectWithTimestamp(tuple, tuple.f3);
                } else if (testStream[i] instanceof Long) {
                    Long ts = (Long) testStream[i];
                    ctx.emitWatermark(new Watermark(ts));
                } else {
                    throw new RuntimeException(testStream[i].toString());
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }

        @Override
        public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
            return typeInformation;
        }

    }
//util function to generate time
    public long t(int n) {
        return new DateTime(2018, 1, 1, 0, 0).plusMinutes(n).getMillis();
    }

Logs for Run 1:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1914335182] with leader session id 2a8bf59e-01fa-4e67-892c-83b10dd65be1.
01/09/2020 00:50:16 Job execution switched to status RUNNING.
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:50:16 Job execution switched to status FINISHED.
[(val1,1514745060000), (val5,1514745360000), (val6,1514745420000)]

Logs for Run2:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1448653751] with leader session id 291df2cb-96fd-4e3c-b46c-911d2ca11905.
01/09/2020 00:49:42 Job execution switched to status RUNNING.
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:49:42 Job execution switched to status FINISHED.
[(val1,1514745060000), (val3,1514745180000), (val4,1514745240000), (val5,1514745360000), (val6,1514745420000)]

Source Functions and other definitions were inspired from this tutorial. Also explored multiple ways of running a basic job with and without evictors from Flink official docs. Tested multiple things without evictors and output was as expected for all runs. Once the evictors came into picture, things started to become indeterministic.

Flink Version 1.4.2

1

1 Answers

2
votes

You haven't shared all of the code, but from what I'm seeing my guess as to what is going on is that the results depend on the ingestion order -- this is the case with count-based windowing, for example -- and in such cases you cannot expect deterministic results.

A windowed join is reading from two input streams, and while the events will be processed in order within each stream, the two streams will race against each other in a non-deterministic and uncontrollable way. The results will be deterministic if and only if the window triggering and processing is based solely event time. If counting or processing time are involved, then you can not expect to produce deterministic results.