I have a Flink streaming job that is running in a production environment and I need to make a change to the main transformation code.
The code in production actually looks like this:
stream
.filter(inboundData -> inboundData.hasToBeFiltered())
.uid("filtered-data")
.keyBy(data -> data.getMyStringKey())
.process(doSomething())
.uid("processed-inbound-data-id");
I need to change the way the data is partitioned by the keyBy()
operator using a different property of the inboundData POJO. The current property used is a String, while the new property is a Long.
The new code would therefore look like this:
stream
.filter(inboundData -> inboundData.hasToBeFiltered())
.uid("filtered-data")
.keyBy(data -> data.getMyLongKey())
.process(doSomething())
.uid("processed-inbound-data-id");
I performed the above change and tried to submit the updated version of the job to my Flink cluster, resuming the state of the operators from the savepoint taken before canceling the old job, but I get the following error:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedProcessOperator_632e4c67d1f4899514828b9c5059a9bb_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 5 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:324)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 7 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:268)
... 11 more
From the stack trace I can infer that the error is due to the fact that I'm changing the type of the key used in the keyBy() operator.
I tried fiddling around a bit with the code, googling for questions on this topic, but I couldn't find anything meaningful that would give me a hint on how to perform the change that I need.
So my questions are:
- Is the change I'm trying to perform achievable without losing the saved states?
- If so, can anyone give me a clue on how to perform such change?
Many thanks.
KeyedState
is strictly associated with key, so I don't think there is a way to restore the state when the key has changed. – Dominik Wosiński