0
votes

I just encountered a very strange problem that I cannot get any results from stream window join when using EventTime with timestamps and watermarks assigner.

I am using Kafka as my data stream source and tried both AscendingTimestampExtractor and custom assigner which implemented AssignerWithPeriodicWatermarks as mentioned from Flink documentation here, and as what I have tested, there is no watermark emited and no joined result generated. If I change to use ProcessingTime and TumblingProcessingTimeWindows without any timestamps assigners, then I can get correct results.

My code for custom timestamps and watermarks assigner is like:

FlinkKafkaConsumer09<String> myConsumer1 =
                new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

FlinkKafkaConsumer09<String> myConsumer2 =
                new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
...
public static class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<String> {
        private long currentMaxTimestamp;
        @Override
        public long extractTimestamp(String element, long previousElementTimestamp) {
            long timestamp = myFunctionToGetMillisFromString(element);
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - 1L);
        }
}
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
    .where(new KeySelector1())
    .equalTo(new KeySelector2())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});

And my code for AscendingTimestampExtractor is like:

FlinkKafkaConsumer09<String> myConsumer1 =
                new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
    @Override
    public long extractAscendingTimestamp(String element) {
        return myFunctionToGetMillisFromString(element);
    }
});

FlinkKafkaConsumer09<String> myConsumer2 =
                new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
    @Override
    public long extractAscendingTimestamp(String element) {
        return myFunctionToGetMillisFromString(element);
    }
});
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
    .where(new KeySelector1())
    .equalTo(new KeySelector2())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});

Thanks for helps!

2
I don't see what the problem is, but a couple of suggestions: (1) in general you'd be better off extending BoundedOutOfOrdernessTimestampExtractor rather than partially reimplementing it; and (2) you could use the debugger in your IDE to see if the stream has any watermarks, and if the window is being triggered, and if not, why not.David Anderson
This looks like a problem with watermarks not moving forward, for whatever reasons. As mentioned by @alpinegizmo, try extending BoundedOutOfOrdernessTimestampExtractor (or atleast manually check whether your events are ordered properly). Also, try to increment your watermark based on how you want it to behave.Biplob Biswas
Thanks for reply @alpinegizmo. I test with extending BoundedOutOfOrdernessTimestampExtractor from both Kafka source and before window join, there was no watermarks at all. I also tried logging the timestamps to task manager logs using slf4j and the number of timestamps are correct. I can't find a way to print out watermarks for now. My data sending to Flink contains timestamps with different values.Channing Zong
The Flink webui can display watermarks, which can help in debugging.David Anderson
@alpinegizmo, yes I can see watermarks changing from the WebUI when I was using IngestionTime and TumblingEventTimeWindows in window joins, but this doesn't happen when using EventTime. The WebUI always shows "No Watermark".Channing Zong

2 Answers

1
votes

I had the same problem, its a rather silly error, I found the solution here:

When you write:

myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

It creates a new data stream rather than modifying that stream and you didn't store this in a variable. So bottomline is :

Store it in a new datastream and apply join to this datastream (which will have these timestamps and watermarks assigned).

0
votes

myConsumer3 = myConsumer1.assign*** myConsumer4 = myConsumer2.assign***

and use myConsumer3/myConsumer4, which will be ok