I am trying out flink and running into an exception when deploying the job on a flink cluster (on kubernetes).
Setup
Flink - 1.4.2
Scala - 2.11.12
Java 8 SDK
Flink docker image on cluster - flink:1.4.2-scala_2.11-alpine
SBT file
ThisBuild / resolvers ++= Seq(
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal
)
name := "streamingTest1"
version := "0.1-SNAPSHOT"
organization := "com.example"
ThisBuild / scalaVersion := "2.11.12"
val flinkVersion = "1.4.2"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided"
)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies
)
libraryDependencies += "com.microsoft.azure" % "azure-eventhubs" % "1.0.0"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"
excludeDependencies ++= Seq(
ExclusionRule("org.ow2.asm", "*")
)
assembly / mainClass := Some("com.example.Job")
// make run command include the provided dependencies
Compile / run := Defaults.runTask(Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated
// exclude Scala library from assembly
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
I can run the job locally on my machine in IDEA or via sbt cli and I can see the expected output or can send the output to my sink.
When running on the flink server which is deployed on a kubernetes cluster I see the following exception.
Error
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema
at com.example.Job$.main(Job.scala:126)
at com.example.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 11 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 20 more
More Background
My project has a scala class for the flink job and it also has java code in a separate directory for a connector which interfaces with EventHub. My main scala code does not have a dependency on KeyedDeserializationSchema but the connector has. KeyedDeserializationSchema most likely is coming from the kafka connector dependency which is included in my sbt file. Is there any reason the kafka connector will not be available when running this job on the cluster?
How can I debug which version of the kafka connector is being loaded on the server? Are there other ways of packaging which could force flink to load the kafka connector?