I have a class that fulfills the requirements to be treated as POJO,
and this is the main transport class in my streaming job
(it only contains primitives and a Map<String, String>
).
I have added a new String
field and the corresponding getter and setter,
but if I stop the job that's using the previous class with a savepoint and try to use it to restart with the new class,
I get an exception:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3b2dbb810ac7d55949cb205a3075facc_(8/8) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
...
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 12 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Index: 172199998, Size: 13
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 23 common frames omitted
For some reason it's falling back on Kryo.
I am using Flink 1.9.3, and according to the documentation this change should be supported.
Based on David's answer, I'm trying to see if I can migrate my state on the fly before adding the new field to the class.
I've added the @TypeInfo
annotation and its factory to MyPojo
,
and I'm trying to migrate state like this:
lsd = new ListStateDescriptor<>(
"newName",
MyPojo.class
);
// migration
TypeToken<LabeledClassWithTimestamp<String>> typeToken = new TypeToken<LabeledClassWithTimestamp<String>>() {};
ListStateDescriptor<LabeledClassWithTimestamp<String>> legacyLSD = new ListStateDescriptor<>(
"oldName",
new KryoSerializer<>((Class<LabeledClassWithTimestamp<String>>) typeToken.getRawType(), runtimeContext.getExecutionConfig())
);
ListState<LabeledClassWithTimestamp<String>> legacyState = runtimeContext.getListState(legacyLSD);
try {
List<MyPojo> newState = new ArrayList<>();
legacyState.get().forEach(o -> newState.add((MyPojo) o));
if (!newState.isEmpty()) {
runtimeContext.getListState(lsd).update(newState);
legacyState.clear();
}
} catch (Exception e) {
LOG.error("Could not migrate state:", e);
}
However, if I restore the job from a previous savepoint using the new jar,
Flink throws a StateMigrationException
in a different operator:
2020-11-08T12:57:59.369Z INFO org.apache.flink.runtime.executiongraph.ExecutionGraph:1511 [flink-akka.actor.default-dispatcher-17] window-operator (1/8) (uid) switched from RUNNING to FAILED. org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
That operator's state only holds integers:
public class SlidingWindowProcessFunction extends ProcessWindowFunction<MyPojo, MyOutput, String, TimeWindow> {
private static final long serialVersionUID = 1L;
private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
"window-state", Integer.class);
...
@Override
public void process(String key, Context context, Iterable<MyPojo> iterable, Collector<MyOutput> collector) {
...
for (Integer hash : context.windowState().getListState(LSD).get()) {
alreadyProcessedHashes.add(hash);
}
...
}
}
Is MyPojo
's serializer relevant for this other operator's state even though the POJO class is not used directly in its managed state?
Class <your class> cannot be used as a POJO type...
– kkrugler