2
votes

My Flink pipeline currently uses an Pojo that contains some Lists and Maps (of Strings), along the lines of

public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

For performance reasons, I want to get around Kryo serialization, so I disabled the generic fallback with env.getConfig().disableGenericTypes(); as described in the Flink documentation.

Now, Flink complains about the lists:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    ...

What is the preferred way of serializing such simple lists and maps in Flink?. Internally, these are currently ArrayList and HashMap, but other implementations would also be fine. There seems to be a class org.apache.flink.api.common.typeutils.base.ListSerializer in Flink, but I do not know how to use it.

2
Could you share your pojo? - Arvid Heise
@ArvidHeise I added the pojo to the question. - Alex Krauss

2 Answers

6
votes

Marius already explained the reason beautifully, although I don't see the reason why Flink does not support your use case out of the box. Nevertheless, I'll add the solution that works right now.

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
    ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
    ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));

// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");

final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);

DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));

Note that the terrible access pattern in the test code is just for a quick and dirty demonstration.

4
votes

If you do:

env.getConfig().disableGenericTypes();

It will raise an exception whenever a data type is encountered that would go through Kryo.

So in that case you have to write your own Serializer. Which can be created using TypeSerializer, simply call typeInfo.createSerializer(config) on the TypeInformation object.

For generic types, you need to “capture” the generic type information via the TypeHint, in your case for a list:

TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});

ListTypeInfo class

More details in here.