0
votes

I am using Apache Flink(v1.10.0) to stream compute my RabbitMQ data, but when I config the check point data to store NFS file system like this:

 env.setStateBackend(new FsStateBackend("nfs://28736435.boston.nas.amazonaws.com:/data/k8s/data/flink/checkpoint"));

give me this error:

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.

    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager

    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager

    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
    ... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.

    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:282)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
    ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'nfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'nfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
    ... 23 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.

    at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
    ... 28 more

The manual says it support NFS file system to store check points. where I am doing wrong with NFS file system config in Apache Flink?

1

1 Answers

1
votes

You'll need to mount the NFS volume on every instance, and use the 'file' scheme, not 'nfs'.

Since you are running on AWS, it would be natural to use S3 for checkpointing. If you decide to do so, you should use flink-s3-fs-presto (s3p://) rather than flink-s3-fs-hadoop (s3a://), because the hadoop implementation has poor performance for checkpointing with Flink.