I have an Akka application with a Router Group composed by actors executing some jobs. When I detected a shutdown of my application, I want that my actors complete their work before shutting down the application completely. The use case of my question is in case of redeployment : I don't want to authorize it if current jobs are not executed.
To detect shutdown of my application, I'm using following code :
scala.sys.addShutdownHook {
// let actors finished their work
}
To make some tests, I add an infinite loop to see if the shutdown hook is blocked but application ends so this is not the expected behavior for me.
In order to let my actors finished their job, I will implement the idea in following article : http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2
So now I'm searching a way to ignore the shutdown hook and closing all resources and application when all jobs have been executed by my workers.
Update after @Edmondo1984 comment
My main app :
val workers = this.createWorkerActors()
val masterOfWorkers = system.actorOf(Master.props(workers), name = "master")
this.monitorActors(supervisor,workers,masterOfWorkers)
this.addShutDownHook(system,masterOfWorkers,supervisor)
def monitorActors(supervisor : ActorRef,workers : List[ActorRef], master : ActorRef) : Unit = {
val actorsToMonitor = master +: workers
supervisor ! MonitorActors(actorsToMonitor)
}
def addShutDownHook
(
system : ActorSystem,
masterOfWorkers : ActorRef, // actor wrapping a ClusterGroup router, brodcasting a PoisonPill to each worker
supervisor : ActorRef
) : Unit = {
scala.sys.addShutdownHook {
implicit val timeout = Timeout(10.hours) // How to block here until actors are terminated ?
system.log.info("Send a Init Shutdown to {}", masterOfWorkers.path.toStringWithoutAddress)
masterOfWorkers ! InitShutDown
system.log.info("Gracefully shutdown all actors of ActorSystem {}", system.name)
Await.result((supervisor ? InitShutDown), Duration.Inf)
system.log.info("Gracefully shutdown actor system")
Await.result(system.terminate(), 1.minutes)
system.log.info("Gracefully shutdown Akka management ...")
Await.result(AkkaManagement(system).stop(), 1.minutes)
System.exit(0)
}
}
Supervisor actor
case class Supervisor() extends Actor with ActorLogging {
var numberOfActorsToWatch = 0L
override def receive: Receive = {
case MonitorActors(actorsToMonitor) =>
log.info("Monitor {} actors, received by {}", actorsToMonitor.length, sender().path)
this.numberOfActorsToWatch = actorsToMonitor.length
actorsToMonitor foreach(context.watch(_))
case Terminated(terminatedActor) if this.numberOfActorsToWatch > 0 =>
log.info("Following actor {} is terminated. Remaining alives actors is {}", terminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
this.numberOfActorsToWatch -= 1
case Terminated(lastTerminatedActor) if this.numberOfActorsToWatch == 0 =>
log.info("Following actor {} is terminated. All actors has been terminated",lastTerminatedActor.path.toStringWithoutAddress, this.numberOfActorsToWatch)
// what I can do here ?
//context.stop(self)
}
}
application.conf
akka {
actor {
coordinated-shutdown {
default-phase-timeout = 20 s
terminate-actor-system = off
exit-jvm = off
run-by-jvm-shutdown-hook = off
}
}
}
I don't know how to block the main thread, the one killing finally the app.