0
votes

I downloaded and installed dependencies for Apache Flink 1.11.1-bin-scala_2.11.tgz. Start the single instance (one machine only) with the command:

> "/home/ubuntu/flink-1.11.1/bin/start-cluster.sh" enter image description here

enter image description here

import org.apache.flink.api.scala._

object Test1 {
  def main(args: Array[String]): Unit = {
    //val env = ExecutionEnvironment.getExecutionEnvironment
    val env = ExecutionEnvironment.createRemoteEnvironment("192.168.0.110", 6123, jarFiles = "target\\scala-2.11\\untitled2_2.11-0.1.jar")

    val text = env.fromElements("To be, or not to be, that is the question")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()

    env.execute("Test1 WordCount")
  }
}

My scala code runs locally, and also when I scp a copy over to the Flink server if I use "ExecutionEnvironment.getExecutionEnvironment". I run it on the server manually by running "./bin/flink run /path/untitled2_2.11-0.1.jar". It ALWAYS fails for some reason when I try to connect remotely from Intellij IDE -> Flink Server using "createRemoteEnvironment". The error I keep getting is the following:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277)
    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:889)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:873)
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
    at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
    at Test1$.main(Test1.scala:16)
    at Test1.main(Test1.scala)

UPDATE #1: Link: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-TooLongFrameException-in-cluster-mode-td20883.html Summary: Changed the port number from 6123 to 8081, the job now registers in the web gui when before it didn't. Newer versions of Flink communicate over user specified REST port instead apparently. It now fails with a different error:

2020-08-07 18:24:22
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
    at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1449)
    at org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.setup(SynchronousChainedCombineDriver.java:90)
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
    at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1316)
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:316)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
    ... 3 more
Caused by: java.io.InvalidClassException: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
    ... 9 more

UPDATE #3: This is caused by a version mismatch between the installed scala on the Apache Flink Server and what my IDE is using.

Apache Flink:

# scala -version

Downgrade your IDE version of Scala to match, keeping code the same

1
Can you check the logs in Web UI for the job manager and edit your question with more details? - Arvid Heise

1 Answers

0
votes

To summarize the series of updates I did to fix my problem:

  1. Change the connection port within createRemoteEnvironment() to 8081
  2. Check the Scala version on the Apache Flink server, then downgrade your Scala version in the IDE to match