I am using Flink and have a stream of JSON strings arriving in my system with dynamically changing fields and nested fields. So I can't mock and convert this incoming JSON as a static POJO and I have to rely on a Map instead.
My first transformation is to convert the JSON string stream into a Map object stream using GSON parsing and then I wrap the map in a DTO called Data.
(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
Problem arises when right after this transformation processing, I attempt to feed the resultant stream into my custom Flink sink. The invoke function does not get called in the sink. The sink works however, if I change from this Map containing DTO to a primitive or a regular DTO with no Map.
My DTO looks like this:
public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields
I have tried the two following solutions:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class;
env.getConfig().disableGenericTypes();
Any expert advise I could use in this situation?
FilterFunction
that just logs something about the map? Does anything get logged? – kkrugler