4
votes

I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.

I tried a few things which I am listing below :

  1. Get the jobmanager actor
  2. Get running jobs
  3. For each running job, send a cancel request to the jobmanager

This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.

The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

which means either the job manager actor ref is wrong or the message sent to it is incorrect.

The code looks like the following:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

Can someone check if this is the correct approach ?

EDIT : To completely stop the job, it is necessary to stop the TaskManager along with the JobManager in the order TaskManager first and then JobManager.

1

1 Answers

3
votes

You're creating a new ActorSystem and then try to find an actor with the name /user/jobmanager_1 in the same actor system. This won't work, since the actual job manager will run in a different ActorSystem.

If you want to obtain an ActorRef to the real job manager, you either have to use the same ActorSystem for the selection (then you can use a local address) or you have find out the remote address for the job manager actor. The remote address has the format akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]. If you have access to the FlinkMiniCluster then you can use the leaderGateway promise to obtain the current leader's ActorGateway.