0
votes

I am trying out flink and running into an exception when deploying the job on a flink cluster (on kubernetes).

Setup
Flink - 1.4.2
Scala - 2.11.12
Java 8 SDK
Flink docker image on cluster - flink:1.4.2-scala_2.11-alpine

SBT file

ThisBuild / resolvers ++= Seq(
    "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
    Resolver.mavenLocal
)

name := "streamingTest1"
version := "0.1-SNAPSHOT"
organization := "com.example"
ThisBuild / scalaVersion := "2.11.12"
val flinkVersion = "1.4.2"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided"
)

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies
  )

libraryDependencies += "com.microsoft.azure" % "azure-eventhubs" % "1.0.0"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"

excludeDependencies ++= Seq(
  ExclusionRule("org.ow2.asm", "*")
)

assembly / mainClass := Some("com.example.Job")

// make run command include the provided dependencies
Compile / run  := Defaults.runTask(Compile / fullClasspath,
                                   Compile / run / mainClass,
                                   Compile / run / runner
                                  ).evaluated

// exclude Scala library from assembly
assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)

I can run the job locally on my machine in IDEA or via sbt cli and I can see the expected output or can send the output to my sink.

When running on the flink server which is deployed on a kubernetes cluster I see the following exception.

Error

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
    ... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
    at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
    at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
    ... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema
    at com.example.Job$.main(Job.scala:126)
    at com.example.Job.main(Job.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
    ... 11 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 20 more

More Background

My project has a scala class for the flink job and it also has java code in a separate directory for a connector which interfaces with EventHub. My main scala code does not have a dependency on KeyedDeserializationSchema but the connector has. KeyedDeserializationSchema most likely is coming from the kafka connector dependency which is included in my sbt file. Is there any reason the kafka connector will not be available when running this job on the cluster?

How can I debug which version of the kafka connector is being loaded on the server? Are there other ways of packaging which could force flink to load the kafka connector?

1
I think I should build a fat jar using 'sbt clean assembly' and probably remove 'provided' flag for the kafka connector. This could be a user error. I'll update if it works.sanyams
I think you have dependency issue.kinkajou
You actually don't want the Kafka connector to be "provided", as it should be a part of your fat jar. However the core flink runtime should be considered "provided"Joshua DeWald

1 Answers

0
votes

As I quickly realized after posting this question the fix was to remove "provided" for the kafka connector.

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion
)