There is a topic in Kafka server. In the program, we read this topic as a stream and assign event timestamp. Then do window operation on this stream. But the program doesn't work. After debug, it seems that processWatermark method of WindowOperator is not executed. Here is my code.
DataStream<Tuple2<String, Long>> advertisement = env
.addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).assignTimestamps(timestampExtractor);
advertisement
.keyBy(keySelector)
.window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
.apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
private static final long serialVersionUID = 5151607280638477891L;
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
out.collect(Iterables.size(values));
}
}).print();
Why this happened? if I add "keyBy(keySelector)" before "assignTimestamps(timestampExtractor)" then the program works. Anyone could help to explain the reason?
StreamExecutionEnvironmentto 1 (just to see if its working). - Robert Metzger