0
votes

There is a topic in Kafka server. In the program, we read this topic as a stream and assign event timestamp. Then do window operation on this stream. But the program doesn't work. After debug, it seems that processWatermark method of WindowOperator is not executed. Here is my code.

    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).assignTimestamps(timestampExtractor);

    advertisement
            .keyBy(keySelector)
            .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
                private static final long serialVersionUID = 5151607280638477891L;
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
                    out.collect(Iterables.size(values));
                }
            }).print();

Why this happened? if I add "keyBy(keySelector)" before "assignTimestamps(timestampExtractor)" then the program works. Anyone could help to explain the reason?

1
How many partitions does your topic "advertisement" have? What's the parallelism of the Flink program? - Robert Metzger
The topic has only one partition and I don't set parallelism in the program. - Jun
Not sure, I think there is a known bug in Flink with the watermarks and Kafka sources, when the parallelism is higher than the number of partitions. Can you set the parallelism at the StreamExecutionEnvironment to 1 (just to see if its working). - Robert Metzger
Thanks for your help. It is because of this bug you mentioned. Could you answer this question and I will mark it as correct answer. Thanks - Jun

1 Answers

2
votes

You are affected by a known bug in Flink: FLINK-3121:Watermark forwarding does not work for sources not producing any data.

The problem is that there are more FlinkKafkaConsumer's running (most likely the number of CPU cores (say 4)) then you have partitions (1). Only one of the Kafka consumers is emitting watermarks, the other consumers are idling.

The window operator is not aware of that, waiting for watermarks to arrive from all consumers. That's why the windows never trigger.