0
votes

I am working on real time streaming data analysis using Apache Flink-1.1.3. My system consist of Kafka cluster for message queue, Flink cluster which read the messages from kafka partitions and make some analysis on it and finally I want to dump the generated data into Ignite Cache. For the system I am using IgniteSink class to sink the data into ignite cache. The versions are as follows:

Flink 1.1.3,
Kafka 2.10, Ignite 2.0.0

When I tried to run the job on flink cluster it gives me the following error,

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:374)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1429)
at flink_ignite_sink_remote.main(flink_ignite_sink_remote.java:77)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder
at org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
at org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at flink_ignite_sink_remote$Splitter.flatMap(flink_ignite_sink_remote.java:177)
at flink_ignite_sink_remote$Splitter.flatMap(flink_ignite_sink_remote.java:1)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)

I have included all the libraries of Ignite-Flink into my project, still I got java.lang.NoClassDefFoundError error.

3
Can you run the same job in local flink setup and confirm if you are observing the same error? Are all the Ignite sink libs are present in the classpath.samaitra

3 Answers

0
votes

I suspect you are using a simple jar instead of uber/fat jar.

If you are using maven try shade-plugin or for sbt sbt-assembly. You can also create your project as described in the quickstart-guide

0
votes

When you work with flink jobs, that you deploy over a flink cluster, you have 2 options:

  1. Generate a jar file with all your dependencies bundled inside and deploy that jar
  2. Add all your dependencies to the classpath of your flink server, so it can find those dependencies out of the jar file.

You problem looks like you are not generating a jar file with all the dependencies inside of it and those dependencies aren't located inside the classpath of the fink server.

Try running the following mvn command to generate your jars:

mvn clean package -Pbuild-jar

It could generate multiple jar, pick the fat jar (the bigger)