I'm pretty new to Flink. I have this code that maps, groups, and sums input JSON.
It's very similar to the word count example.
I expected to get (vacant,1) (occupied,2)
But, for some reason I'm getting (occupied,1) (vacant,1) (occupied,2)
public static void main(String[] args) throws Exception {
String s = "{\n" +
" \"Port_128\": \"occupied\",\n" +
" \"Port_129\": \"occupied\",\n" +
" \"Port_120\": \"vacant\"\n" +
"\n" +
"}";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> in = env.fromElements(s);
SingleOutputStreamOperator<Tuple2<String, Integer>> t =
in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>>
collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(s);
node.elements().forEachRemaining(v -> {
collector.collect(new Tuple2<>(v.textValue(), 1));
});
}
}).keyBy(0).sum(1);
t.print();
env.execute();