I am trying to use Jackson library to read a String from Kafka topic and perform a join from another stream.
Here is a sample code with two streams of data. I want to perform join operation on these to message stream.
Say for example, the incoming streams are:
messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}
Join criteria is messageStream1."A" = messageStream2."B". How do I implement this in Flink?
DataStream 1:
DataStream<String> messageStream1 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
DataStream 2:
DataStream<String> messageStream2 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});