I have following code to count words from socketTextStream. Both cumulate word counts and time windowed word counts are needed. The program has an issue that cumulateCounts is always the same as windowed counts. Why this issue occurs? What is the correct way to calculate cumulate counts base on windowed counts?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});