I am trying to run a basic join over Flink by joining two DataStreams on local. The datatype of the source streams are same(Tuple4(String, String, Long, Long)). After running the below mentioned function multiple times, I received two different outputs on random basis (Stored in variable CollectTuple2Sink below, DEBUG logs for the same are mentioned below). I tried keeping parallelism 1 and max parallelism 1 but issue still persists.
//Basic Function
public void runBasicJoin() throws Exception {
TumblingEventTimeWindows tsAssigner;
//tried with getExecutionEnvironment as well
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
//class declared below
CollectTuple2Sink.VALUES.clear();
Tuple4<String, String, Long, Long> input1 =
new Tuple4<String, String, Long, Long>("key1", "val1", 1L, t(1));
Tuple4<String, String, Long, Long> input2 =
new Tuple4<String, String, Long, Long>("key1", "val2", 12L, t(2));
Tuple4<String, String, Long, Long> input3 =
new Tuple4<String, String, Long, Long>("key1", "val3", 3L, t(3));
Tuple4<String, String, Long, Long> input4 =
new Tuple4<String, String, Long, Long>("key2", "val4", 18L, t(4));
Tuple4<String, String, Long, Long> input5 =
new Tuple4<String, String, Long, Long>("key1", "val5", 11L, t(6));
Tuple4<String, String, Long, Long> input6 =
new Tuple4<String, String, Long, Long>("key1", "val6", -121L, t(7));
Tuple4<String, String, Long, Long> input7 =
new Tuple4<String, String, Long, Long>("key2", "val7", -111L, t(8));
Tuple4<String, String, Long, Long> input8 =
new Tuple4<String, String, Long, Long>("key2", "val8", 111L, t(9));
@SuppressWarnings("unchecked")
DataStream<Tuple4<String, String, Long, Long>> dataStream1 = env.addSource(new Tuple4Soruce(
t(0), input1, input2, input3, input4,t(5),
input5, input6, input7, input8,t(10)
));
dataStream1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
return tuple4.f3;
}
});
@SuppressWarnings("unchecked")
DataStream<Tuple4<String, String, Long, Long>> dataStream2 = env.addSource(new Tuple4Soruce(
t(0), input1, input3,input3,input4,input4,input4,t(5),
input5,input6, t(10),t(11)
));
dataStream2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
return tuple4.f3;
}
});
tsAssigner = TumblingEventTimeWindows.of(Time.minutes(5));
dataStream1.join(dataStream2)
.where(new Tuple4KeySelector())
.equalTo(new Tuple4KeySelector())
.window(tsAssigner)
.trigger(EventTimeTrigger.create())
.evictor(CountEvictor.of(2))
.apply(new Tuple4JoinFunction())
.addSink(new CollectTuple2Sink());
env.execute();
System.out.println(CollectTuple2Sink.VALUES);
}
private static class CollectTuple2Sink
implements SinkFunction<Tuple2<String, Long>> {
public static final List<Tuple2<String, Long>> VALUES = new ArrayList<>();
@Override
public synchronized void invoke(Tuple2<String, Long> value)
throws Exception {
VALUES.add(value);
}
}
//join function ---> Takes the 2nd and 4th field of a tuple and convert tuple4 to tuple2
private static class Tuple4JoinFunction implements JoinFunction<Tuple4<String, String, Long, Long>, Tuple4<String, String, Long, Long>, Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> join(Tuple4<String, String, Long, Long> tuple4, Tuple4<String, String, Long, Long> tuple42) throws Exception {
return new Tuple2<>(tuple4.f1, tuple4.f3);
}
}
//key selector --> select the 2nd value of tuple 4
private static class Tuple4KeySelector implements KeySelector<Tuple4<String, String, Long, Long>, String> {
@Override
public String getKey(Tuple4<String, String, Long, Long> tuple4) throws Exception {
return tuple4.f1;
}
}
//source function --> generates a sequence input for tuple4
private static class Tuple4Soruce
implements SourceFunction, ResultTypeQueryable<Tuple4<String, String, Long, Long>> {
private volatile boolean running = true;
private Object[] testStream;
private TypeInformation<Tuple4<String, String, Long, Long>> typeInformation =
TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>() {
});
Tuple4Soruce(Object... eventsOrWatermarks) {
this.testStream = eventsOrWatermarks;
}
@Override
public void run(SourceContext ctx) throws Exception {
for (int i = 0; (i < testStream.length) && running; i++) {
if (testStream[i] instanceof Tuple4) {
Tuple4<String, String, Long, Long> tuple =
(Tuple4<String, String, Long, Long>) testStream[i];
ctx.collectWithTimestamp(tuple, tuple.f3);
} else if (testStream[i] instanceof Long) {
Long ts = (Long) testStream[i];
ctx.emitWatermark(new Watermark(ts));
} else {
throw new RuntimeException(testStream[i].toString());
}
}
}
@Override
public void cancel() {
running = false;
}
@Override
public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
return typeInformation;
}
}
//util function to generate time
public long t(int n) {
return new DateTime(2018, 1, 1, 0, 0).plusMinutes(n).getMillis();
}
Logs for Run 1:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1914335182] with leader session id 2a8bf59e-01fa-4e67-892c-83b10dd65be1.
01/09/2020 00:50:16 Job execution switched to status RUNNING.
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED
01/09/2020 00:50:16 Job execution switched to status FINISHED.
[(val1,1514745060000), (val5,1514745360000), (val6,1514745420000)]
Logs for Run2:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1448653751] with leader session id 291df2cb-96fd-4e3c-b46c-911d2ca11905.
01/09/2020 00:49:42 Job execution switched to status RUNNING.
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED
01/09/2020 00:49:42 Job execution switched to status FINISHED.
[(val1,1514745060000), (val3,1514745180000), (val4,1514745240000), (val5,1514745360000), (val6,1514745420000)]
Source Functions and other definitions were inspired from this tutorial. Also explored multiple ways of running a basic job with and without evictors from Flink official docs. Tested multiple things without evictors and output was as expected for all runs. Once the evictors came into picture, things started to become indeterministic.
Flink Version 1.4.2