I downloaded and installed dependencies for Apache Flink 1.11.1-bin-scala_2.11.tgz. Start the single instance (one machine only) with the command:
> "/home/ubuntu/flink-1.11.1/bin/start-cluster.sh"
import org.apache.flink.api.scala._
object Test1 {
def main(args: Array[String]): Unit = {
//val env = ExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.createRemoteEnvironment("192.168.0.110", 6123, jarFiles = "target\\scala-2.11\\untitled2_2.11-0.1.jar")
val text = env.fromElements("To be, or not to be, that is the question")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
env.execute("Test1 WordCount")
}
}
My scala code runs locally, and also when I scp a copy over to the Flink server if I use "ExecutionEnvironment.getExecutionEnvironment". I run it on the server manually by running "./bin/flink run /path/untitled2_2.11-0.1.jar". It ALWAYS fails for some reason when I try to connect remotely from Intellij IDE -> Flink Server using "createRemoteEnvironment". The error I keep getting is the following:
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277)
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:889)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:873)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
at Test1$.main(Test1.scala:16)
at Test1.main(Test1.scala)
UPDATE #1: Link: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-TooLongFrameException-in-cluster-mode-td20883.html Summary: Changed the port number from 6123 to 8081, the job now registers in the web gui when before it didn't. Newer versions of Flink communicate over user specified REST port instead apparently. It now fails with a different error:
2020-08-07 18:24:22
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1449)
at org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.setup(SynchronousChainedCombineDriver.java:90)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1316)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:316)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.io.InvalidClassException: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more
UPDATE #3: This is caused by a version mismatch between the installed scala on the Apache Flink Server and what my IDE is using.
Apache Flink:
# scala -version
Downgrade your IDE version of Scala to match, keeping code the same