0
votes

I am trying to submit a job to my flink cluster, but I keep running into the error below:

2021-05-03 17:14:32
java.lang.NoSuchMethodError: org/apache/flink/api/common/state/OperatorStateStore.getSerializableListState(Ljava/lang/String;)Lorg/apache/flink/api/common/state/ListState; (loaded from file:/opt/flink/lib/flink-dist_2.11-1.11.3.jar by jdk.internal.loader.ClassLoaders$AppClassLoader@c7a20636) called from class org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase (loaded from file:/tmp/blobStore-d962f26c-fc16-4ff4-89da-4d86ed60c35e/job_fdc8c054e20b751b6a6f549af602c3d2/blob_p-5fcb7b854786da736df1bbd47aa02017c714f655-34f669043602e72ef3faf32247ab2b17 by org.apache.flink.util.ChildFirstClassLoader@3952d030).
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:858)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$274/0x0000000014046b10.run(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.base/java.lang.Thread.run(Thread.java:836)

I believe the issue has something to do with a version mismatch (I assume with the kafka connector I am using), but I haven't had any luck resolving it. I've tried setting my flink version to be the same as my cluster (1.11.3), but the error persists. I'm not entirely sure how to use the maven-shade-plugin to resolve this either. Any help would be appreciated!

1
What versions of Flink and the Kafka connector were used to create the savepoint from which you are attempting to restore the state?David Anderson
The Kafka connector version is 1.9.0, and the Elastic connector version is 1.10.0. The Flink version was initially 1.9.1, but when I changed all these version numbers to match the cluster (1.11.3), I ran into a bunch of other errors like "No ExecutorFactory found to execute the application." Here is the repo, in case that helps! github.com/alif-munim/learning-flinkAlif Munim

1 Answers

0
votes

In the release notes for Flink 1.11 it states that

Removal of deprecated state access methods (FLINK-17376)

We removed deprecated state access methods RuntimeContext#getFoldingState(), OperatorStateStore#getSerializableListState() and OperatorStateStore#getOperatorState(). This means that some code that was compiled against Flink 1.10 will not work with a Flink 1.11 cluster. An example of this is our Kafka connector which internally used OperatorStateStore.getSerializableListState.

You should always expect to recompile your user jars when upgrading to new versions of Flink. Binary compatibility across minor version updates (e.g., from 1.10.x to 1.11.y) is not guaranteed. (It's unusual for breaking changes to be introduced in patch releases, but it has happened once or twice.)

Recap: everything involved -- your user jar, its dependencies, and the cluster should all be using the same version of Flink.