0
votes

I have a Flink application which consumes data from Kafka cluster and runs SQL data transformations. I am running this application on EMR and when I run with Java -jar option, application running as expected.

However, when running with yarn using command flink run -m yarn-cluster -yn 2 -yjm 1g -ytm 2g JarFileName arg1 arg2

application failing with below stack trace.

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
    ... 36 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: unexpected block data
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

Update:

This issue is resolved. Root cause is the Flink's class loader. We were using Springboot maven plugin initially. We changed it to maven shade plugin and that fixed the issue.

Reference: Integration - Apache Flink + Spring Boot

1
Dose your code work well in local environment or ide?MIkCode
Yes in local environment, everything works as expected. Only in cluster environment, it's failingRamana
Looks like your CLI command (flink run -m yarn-cluster...) got munged, please edit with escaping so all of the parameters are visible, thanks.kkrugler

1 Answers

0
votes

Which Flink version are you using? Are you sure that on the cluster, you use the same version?

Such (de)serialization issues are usually connected to different version used for serialization and deserialization.