0
votes

I am running a flink emr job and getting the following results:

    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.myorg.quickstart.StreamingJob.main(StreamingJob.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:309)
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
    at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:76)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:351)
    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
    ... 7 more
Caused by: java.lang.IllegalArgumentException: The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI.
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:187)
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:109)
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:131)
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.<init>(MemoryStateBackend.java:238)
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.configure(MemoryStateBackend.java:286)
    at org.apache.flink.runtime.state.memory.MemoryStateBackendFactory.createFromConfig(MemoryStateBackendFactory.java:33)
    at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:225)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
    at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1198)
    at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1178)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:287)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)
    at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
    ... 10 more

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
    ... 4 more

The long running flink yarn application is up and running, and I've configured the emr cluster with high availability according to https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-configure.html.

I've also tried to run the example batch program via flink run /usr/lib/flink/examples/batch/WordCount.jar, and I can see that successfully complete in the web UI.

Is there some other configuration that I may be missing based on this error?

1

1 Answers

1
votes

According to the stack trace, it looks as if you have configured the job to use the MemoryStateBackend. When configuring this state backend one can also specify the checkpoint and savepoint path. When configuruing these paths it is important to specify the schema hdfs:// (or any other distributed file system which is supported and you want to use) because the system needs to know which Filesystem to use. Instead of specifying the path as my/checkpoint/path, it should be hdfs://my/checkpoint/path.