2
votes

I am trying to convert my application from flink stream processing to flink batch processing.

For flink data stream, I read string from a pre-defined file with multiple JSON objects and do a flatmap from Json Objects to a tuple3 collector (first element - one field from json object, second element - another fieled from json object, third element - actual json object data).

DataStream<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
                @Override
                public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
                    ObjectNode record = mapper.readValue(value, ObjectNode.class);
                    JsonNode customer = record.get("customer");
                    JsonNode deviceId = record.get("id");
                                       if (customer != null && deviceId != null) {
                        out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
                    }
                }
            });

Then, do a keyBy of first and element of the tuple within a window.

WindowedStream<Tuple3<String, Integer,ObjectNode>, Tuple, TimeWindow> combinedData = transformedSource
            .keyBy(0, 1)
            .timeWindow(Time.seconds(5));

For flink batch processing, how to do a KeyBy of DataSet Batch, is there an equivalent method of KeyBy in DataSet

DataSet<String> source = env.readTextFile("file:///path /to/ file");


DataSet<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
                @Override
                public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
                    ObjectNode record = mapper.readValue(value, ObjectNode.class);
                    JsonNode customer = record.get("customer");
                    JsonNode deviceId = record.get("id");
                                       if (customer != null && deviceId != null) {
                        out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
                    }
                }
            });
1

1 Answers

2
votes

groupBy seems to be the method you're looking for