I just encountered a very strange problem that I cannot get any results from stream window join when using EventTime with timestamps and watermarks assigner.
I am using Kafka as my data stream source and tried both AscendingTimestampExtractor and custom assigner which implemented AssignerWithPeriodicWatermarks as mentioned from Flink documentation here, and as what I have tested, there is no watermark emited and no joined result generated. If I change to use ProcessingTime and TumblingProcessingTimeWindows without any timestamps assigners, then I can get correct results.
My code for custom timestamps and watermarks assigner is like:
FlinkKafkaConsumer09<String> myConsumer1 =
new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
FlinkKafkaConsumer09<String> myConsumer2 =
new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
...
public static class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<String> {
private long currentMaxTimestamp;
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
long timestamp = myFunctionToGetMillisFromString(element);
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - 1L);
}
}
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
.where(new KeySelector1())
.equalTo(new KeySelector2())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});
And my code for AscendingTimestampExtractor is like:
FlinkKafkaConsumer09<String> myConsumer1 =
new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
return myFunctionToGetMillisFromString(element);
}
});
FlinkKafkaConsumer09<String> myConsumer2 =
new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
return myFunctionToGetMillisFromString(element);
}
});
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
.where(new KeySelector1())
.equalTo(new KeySelector2())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});
Thanks for helps!