I am trying to convert my application from flink stream processing to flink batch processing.
For flink data stream, I read string from a pre-defined file with multiple JSON objects and do a flatmap from Json Objects to a tuple3 collector (first element - one field from json object, second element - another fieled from json object, third element - actual json object data).
DataStream<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
ObjectNode record = mapper.readValue(value, ObjectNode.class);
JsonNode customer = record.get("customer");
JsonNode deviceId = record.get("id");
if (customer != null && deviceId != null) {
out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
}
}
});
Then, do a keyBy of first and element of the tuple within a window.
WindowedStream<Tuple3<String, Integer,ObjectNode>, Tuple, TimeWindow> combinedData = transformedSource
.keyBy(0, 1)
.timeWindow(Time.seconds(5));
For flink batch processing, how to do a KeyBy of DataSet Batch, is there an equivalent method of KeyBy in DataSet
DataSet<String> source = env.readTextFile("file:///path /to/ file");
DataSet<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception {
ObjectNode record = mapper.readValue(value, ObjectNode.class);
JsonNode customer = record.get("customer");
JsonNode deviceId = record.get("id");
if (customer != null && deviceId != null) {
out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record));
}
}
});