0
votes

I'm running Flink on YARN with two taskmanagers. I wrote a simple job that consumes messages from Kafka. The job runs on taskmanager 1. When I kill taskmanager 1 (via kill PID), the job gets restarted on taskmanager 2. So far so good. But right after starting the consumer the execution fails:

java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
    ... 8 more

I build the jar file with:

mvn clean package -Pbuild-jar

I also tried this but makes no difference:

mvn clean package

It's strange that my job runs fine on the first attempt, but I get CNFEs on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT because I need the BucketSink). I compared the classpaths of both taskmanagers, they are equal.

1
have you added kafka dependencies ?dumb_terminal
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.10.2-hadoop1</version> </dependency>dumb_terminal
you might need to change the version thoughdumb_terminal
I would recommend to post questions about SNAPSHOT versions to Flink's dev mailing list. These are unreleased development versions which can have temporary regressions or bugs and are constantly changing. Therefore, these kinds of questions are of very limited use for the Stack Overflow community. However, problems with SNAPSHOT versions are very interesting for the developer community.Fabian Hueske
OK, I will post it on the mailing liststatic-max

1 Answers

0
votes

For completeness: This way a temporary issue and is already fixed.