I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B.
The input:
{
"A": "352580084349898",
"B": "1546559127",
"C": "A"
}
This is the core logic of my Flink code:
DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
.map(new MapFunction<String, GenericDataObject>() {
@Override
public GenericDataObject map(String s) throws Exception {
JSONObject jsonObject = new JSONObject(s);
GenericDataObject genericDataObject = new GenericDataObject();
genericDataObject.setA(jsonObject.getString("A"));
genericDataObject.setB(jsonObject.getString("B"));
genericDataObject.setC(jsonObject.getString("C"));
return genericDataObject;
}
});
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
.keyBy("A", "B")
.map(new MapFunction<GenericDataObject, GenericDataObject>() {
@Override
public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
return genericDataObject;
}
});
testStream.print();
GenericDataObject is a POJO with three fields A, B and C .
And this is the console output for different values of field B.
5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}
Notice lines 1 and 2. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here, can someone please point me in the right direction?