0
votes

I am trying to setup Apache Flink standalone cluster consisting of 2 master nodes and one worker node. Using Flink 1.6 and Zookeeper. To start and stop cluster I used process described in Flink's 1.6 documentation, i.e. to start cluster I ran start-zookeeper-quorum.sh and then start-cluster.sh and to stop cluster I ran stop-cluster.sh

After running one job (which failed), then stopping and restarting cluster again I noticed error where none of 2 the job managers could start because they are looking for directory job_e44fdee88a931200953fed45883ee3f1 which does not exist (I am assuming this is directory for my failed job, but not sure)

How do I recover cluster from this error?

2018-09-06 14:58:04,065 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Fatal error occurred in the cluster entrypoint.
java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
        at org.apache.flink.util.function.ConsumerWithException.accept(ConsumerWithException.java:40)
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJobManager$29(Dispatcher.java:820)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
        at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
        at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:936)
        at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:291)
        at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:281)
        at org.apache.flink.util.function.ConsumerWithException.accept(ConsumerWithException.java:38)
:
       ... 21 more
Caused by: java.lang.Exception: Cannot set up the user code libraries: /hastorage/default/blob/job_e44fdee88a931200953fed45883ee3f1/blob_p-f655414c973995e93709acbd22c1c162c9c43a98-75bd4e71882f988a6c337222efadba7b (No such file or directory)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:134)
        ... 25 more
Caused by: java.io.FileNotFoundException: /hastorage/default/blob/job_e44fdee88a931200953fed45883ee3f1/blob_p-f655414c973995e93709acbd22c1c162c9c43a98-75bd4e71882f988a6c337222efadba7b (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:102)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:84)
        at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:493)
        at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
        at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417)
        at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
        at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:131)
        ... 25 more
2018-09-06 14:58:04,069 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
1
Thank you for reply,Till. I have a couple of new questions. Is this particular bug applicable only to standalone HA cluster implementation or it is also present in Yarn and Kubernetes HA setup as well? What is the projected release date for the fix (if you have it of course) And what can I do meanwhile to clear current Flink state so it stops looking for non-existing blob directory?Vera

1 Answers

0
votes

The problem you are observing is caused by a bug in Flink. You can find more details about the problem here. The problem will be fixed with the next bug fix release.