0
votes

I have a Flink streaming job that is running in a production environment and I need to make a change to the main transformation code.

The code in production actually looks like this:

stream
   .filter(inboundData -> inboundData.hasToBeFiltered())
   .uid("filtered-data")
   .keyBy(data -> data.getMyStringKey())
   .process(doSomething())
   .uid("processed-inbound-data-id");

I need to change the way the data is partitioned by the keyBy() operator using a different property of the inboundData POJO. The current property used is a String, while the new property is a Long.

The new code would therefore look like this:

stream
   .filter(inboundData -> inboundData.hasToBeFiltered())
   .uid("filtered-data")
   .keyBy(data -> data.getMyLongKey())
   .process(doSomething())
   .uid("processed-inbound-data-id");

I performed the above change and tried to submit the updated version of the job to my Flink cluster, resuming the state of the operators from the savepoint taken before canceling the old job, but I get the following error:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

    Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedProcessOperator_632e4c67d1f4899514828b9c5059a9bb_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
    ... 5 more

    Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:324)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 7 more

    Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
    at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:268)
    ... 11 more

From the stack trace I can infer that the error is due to the fact that I'm changing the type of the key used in the keyBy() operator.

I tried fiddling around a bit with the code, googling for questions on this topic, but I couldn't find anything meaningful that would give me a hint on how to perform the change that I need.

So my questions are:

  • Is the change I'm trying to perform achievable without losing the saved states?
  • If so, can anyone give me a clue on how to perform such change?

Many thanks.

1
I don't think this is possible to be honest, KeyedState is strictly associated with key, so I don't think there is a way to restore the state when the key has changed.Dominik Wosiński
@DominikWosiński what if the value of changed key equals to the older key value? It doesn't matter, right?Qoobee
@Qoobee I am not sure what do You mean by the fact that old key equals to the new one. How did it change then ??Dominik Wosiński
@DominikWosiński such as just change the key field name or add a new field into a POJO and use the new field as new key but the value of new field is the same as the old key field. Thx!Qoobee

1 Answers

0
votes

I think you should be able to use the State Processor API (about to be have its first release as part of Flink 1.9) to write a DataSet program that reads a savepoint taken with the old version and writes a new savepoint compatible with the new version.