I'm trying to setup a stream processing pipeline with Beam and Flink runner. Flink is a local session deployment with following docker-compose file:
version: "3"
services:
jobmanager:
image: flink:1.12.0-scala_2.12-java11
container_name: flink-jobmanager
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
ports:
- 8081:8081
command: jobmanager
networks:
- flink
taskmanager:
image: flink:1.12.0-scala_2.12-java11
container_name: flink-taskmanager
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
command: taskmanager
networks:
- flink
networks:
flink:
Below is my pom.xml of Beam application:
<properties>
<beam.version>2.27.0</beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.12</artifactId>
<version>${beam.version}</version>
</dependency>
</dependencies>
My Beam application run perfectly fine with Beam Direct runner, but when I try to run with Flink runner, the I got the following exceptions:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:470)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1122)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:277)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1373)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:700)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.util.ArrayList.readObject(Unknown Source)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:467)
... 14 more
JDK version: Oracle Java 11
Execution command:
mvn exec:java -Dexec.mainClass=vn.duclm.beam.BeamApplication \
-Dexec.args="--runner=FlinkRunner \
--flinkMaster=localhost:8081 \
--filesToStage=target/beam-hello.jar"
I've tried to synchronize version of Flink and Beam but no luck. I've no idea now what could be done to resolve the problem. Thanks in advance for any help.