0
votes

I have a class that fulfills the requirements to be treated as POJO, and this is the main transport class in my streaming job (it only contains primitives and a Map<String, String>). I have added a new String field and the corresponding getter and setter, but if I stop the job that's using the previous class with a savepoint and try to use it to restart with the new class, I get an exception:

 java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
        ... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
        at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.get(ArrayList.java:433)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 23 common frames omitted

For some reason it's falling back on Kryo.

I am using Flink 1.9.3, and according to the documentation this change should be supported.


Based on David's answer, I'm trying to see if I can migrate my state on the fly before adding the new field to the class. I've added the @TypeInfo annotation and its factory to MyPojo, and I'm trying to migrate state like this:

lsd = new ListStateDescriptor<>(
        "newName",
        MyPojo.class
);

// migration

TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
        "oldName",
        new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);

ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
    List<MyPojo> newState = new ArrayList<>();
    legacyState.get().forEach(o -> newState.add((MyPojo) o));

    if (!newState.isEmpty()) {
        runtimeContext.getListState(lsd).update(newState);
        legacyState.clear();
    }
} catch (Exception e) {
    LOG.error("Could not migrate state:", e);
}

However, if I restore the job from a previous savepoint using the new jar, Flink throws a StateMigrationException in a different operator:

2020-11-08T12:57:59.369Z INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.

That operator's state only holds integers:

public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
    private static final long serialVersionUID = 1L;
    
    private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
            "window-state", Integer.class);
    
    ...
    
    @Override
    public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
        ...
        for (Integer hash : context.windowState().getListState(LSD).get()) {
            alreadyProcessedHashes.add(hash);
        }
        ...
    }
}

Is MyPojo's serializer relevant for this other operator's state even though the POJO class is not used directly in its managed state?

2
Just to confirm, logs from the workflow with the previous version of your POJO do NOT contain any INFO messages that look like Class <your class> cannot be used as a POJO type...kkrugler
@kkrugler that's right, I did verify that.Alexis

2 Answers

2
votes

Flink will accept a class as a valid POJO type even if it contains a field (such as a LIST or MAP) that it cannot serialize without falling back to Kryo. In such cases the INFO log message about Class <your class> cannot be used as a POJO type ... will not appear, but the class will not be fully supported for state migration.

Flink can deal with LIST and MAP types in POJO fields, but doesn't do so automatically (in order to avoid breaking backwards compatibility).

You can get this working cleanly by annotating your class with @TypeInfo and implementing a TypeInfoFactory<T> for it that specifies the correct org.apache.flink.api.common.typeinfo.Types for each field, including the org.apache.flink.api.common.typeinfo.Types#MAP.

That might look something like this:

@TypeInfo(MyPojo.MyPojoTypeInfoFactory.class)
public class MyPojo {
  private String data;
  private HashMap<String, String> attributes;

  public static class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
    @Override
    public TypeInformation<MyPojo> createTypeInfo(
        Type t, Map<String, TypeInformation<?>> genericParameters) {
      Map<String, TypeInformation<?>> fields =
          new HashMap<String, TypeInformation<?>>() {
            {
              put("data", Types.STRING);
              put("attributes", Types.MAP(Types.STRING, Types.STRING));
            }
          };
      return Types.POJO(MyPojo.class, fields);
    }
  }
}

Note that a Types.MAP field must not be null. Null keys are not allowed in the map, but null values are okay.

0
votes

I haven't been able to make sense of what's going on internally in Flink, but I found a way to achieve migration in 2 upgrades, although I don't really understand why it works.

In the first upgrade I don't add any new field to the POJO class, but I add the type information that David suggested. The critical thing in my case was that, since my initial job could not be modified anymore and the POJO didn't have the annotations, the type information for the Map in the TypeInfoFactory had to point to Kryo:

put("mapField", Types.GENERIC(Map.class));

I then added a new state descriptor without modifying the old one (my old state was defined in terms of an interface):

ListStateDescriptor<InterfaceWithGeneric<String>> legacyLSD = new ListStateDescriptor<>(
        "oldName",
        TypeInformation.of(new TypeHint<InterfaceWithGeneric<String>>() {})
);

ListStateDescriptor<MyPojo> newLSD = new ListStateDescriptor<>(
        "newName",
        MyPojo.class
);

With that I can read from the old descriptor and initialize the new one as needed.

In the second upgrade I can remove the old descriptor and add the new field to the POJO, updating the TypeInfoFactory as well. Serialization of the existing map will have to continue using Kryo, as I couldn't find a way to modify that.

Adding the annotations, the new field, and the new descriptor in one upgrade didn't work for me. I also couldn't reuse the old descriptor; the first upgrade would work fine, but adding a new field in the second upgrade threw exceptions again. I don't know why some of the exceptions were referencing completely unrelated operators, but it seems to just be erroneous reporting from the state restore backend.