0
votes

I have an Akka application with a Router Group composed by actors executing some jobs. When I detected a shutdown of my application, I want that my actors complete their work before shutting down the application completely. The use case of my question is in case of redeployment : I don't want to authorize it if current jobs are not executed.

To detect shutdown of my application, I'm using following code :

scala.sys.addShutdownHook { 
// let actors finished their work
}

To make some tests, I add an infinite loop to see if the shutdown hook is blocked but application ends so this is not the expected behavior for me.

In order to let my actors finished their job, I will implement the idea in following article : http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2

So now I'm searching a way to ignore the shutdown hook and closing all resources and application when all jobs have been executed by my workers.

Update after @Edmondo1984 comment

My main app :

val workers = this.createWorkerActors()
val masterOfWorkers = system.actorOf(Master.props(workers), name = "master")

    this.monitorActors(supervisor,workers,masterOfWorkers)
    this.addShutDownHook(system,masterOfWorkers,supervisor)

def monitorActors(supervisor : ActorRef,workers : List[ActorRef], master : ActorRef) : Unit = {
    val actorsToMonitor = master +: workers
    supervisor ! MonitorActors(actorsToMonitor)
  }

  def addShutDownHook
  (
    system : ActorSystem,
    masterOfWorkers : ActorRef, // actor wrapping a ClusterGroup router, brodcasting a PoisonPill to each worker
    supervisor : ActorRef
  ) : Unit = {
    scala.sys.addShutdownHook {
      implicit val timeout = Timeout(10.hours) // How to block here until actors are terminated ?
      system.log.info("Send a Init Shutdown to {}", masterOfWorkers.path.toStringWithoutAddress)
      masterOfWorkers ! InitShutDown
      system.log.info("Gracefully shutdown all actors of ActorSystem {}", system.name)
      Await.result((supervisor ? InitShutDown), Duration.Inf)
      system.log.info("Gracefully shutdown actor system")
      Await.result(system.terminate(), 1.minutes)
      system.log.info("Gracefully shutdown Akka management ...")
      Await.result(AkkaManagement(system).stop(), 1.minutes)
      System.exit(0)
    }
  }

Supervisor actor

case class Supervisor()  extends Actor with ActorLogging {

  var numberOfActorsToWatch = 0L

  override def receive: Receive = {
    case MonitorActors(actorsToMonitor) =>
      log.info("Monitor {} actors, received by {}", actorsToMonitor.length, sender().path)
      this.numberOfActorsToWatch = actorsToMonitor.length
      actorsToMonitor foreach(context.watch(_))

    case Terminated(terminatedActor) if this.numberOfActorsToWatch > 0 =>
      log.info("Following actor {} is terminated. Remaining alives actors is {}", terminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
      this.numberOfActorsToWatch -= 1

    case Terminated(lastTerminatedActor) if this.numberOfActorsToWatch == 0 =>
      log.info("Following actor {} is terminated. All actors has been terminated",lastTerminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
      // what I can do here ?
      //context.stop(self)

  }
}

application.conf

akka {
actor {
coordinated-shutdown {
    default-phase-timeout = 20 s
    terminate-actor-system = off
    exit-jvm = off
    run-by-jvm-shutdown-hook = off
  }
}
}

I don't know how to block the main thread, the one killing finally the app.

1

1 Answers

1
votes

This is easily achieved by placing a supervisor actor in front of your hierarchy:

  • When you need shutdown, you send a message to the supervisor and you cache the sender A
  • The supervisor subscribes to children death through DeadWatch (see https://doc.akka.io/docs/akka/2.5/actors.html)
  • The supervisor will set a counter variable to the number of children, then sends a message to all the childs telling them to shut down asap. When the childs are done, they will terminate themselves. The supervisor will receive a notification and decrease the counter
  • When the counter reach 0, the supervisor will send a message to A saying ShutdownTerminated and terminates itself.

Your code will become like so:

class Supervisor  extends Actor with ActorLogging {

  var shutdownInitiator:ActorRef = _
  var numberOfActorsToWatch = 0L

  override def receive: Receive = {
    case InitShutdown =>
      this.numberOfActorsToWatch = context.children.length
      context.children.foreach(context.watch(_))
      context.children.foreach { s => s ! TerminateSomehow } 
      shutdownInitiator = sender
    case Terminated(terminatedActor) if this.numberOfActorsToWatch > 0 =>
      log.info("Following actor {} is terminated. Remaining alives actors is {}", terminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
      this.numberOfActorsToWatch -= 1

    case Terminated(lastTerminatedActor) if this.numberOfActorsToWatch == 0 =>
      log.info("Following actor {} is terminated. All actors has been terminated",lastTerminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
      // what I can do here ?
      shutdownInitiator ! Done
      context.stop(self)

  }
}

On your shutdown hook, you need a reference to the supervisor and use the ask pattern:

Await.result(supervisor ? InitShutdown, Duration.Inf)
actorSystem.terminate()