After I read this excellent blog written by the AKKA team, Shutdown Patterns in AKKA 2, I run the code and it indeed works.
But when I do another experiment with a slight change, throwing out an exception in workers, then this pattern won't work. Since it's reasonable that workers may throw any kinds of exceptions during working, right?
The following are my code, two files:
Reaper.scala, copied from the article mentioned above:
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
TestWorker.scala
import akka.actor.{SupervisorStrategy, Props, ActorSystem, Actor}
import Reaper._
class TestReaper extends Reaper {
def allSoulsReaped(): Unit = context.system.shutdown()
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
}
// The reaper sends this message to all workers to notify them to start work
case object StartWork
class TestWorker extends Actor {
def receive = {
case StartWork =>
// do real work ...
throw new IllegalStateException("Simulate uncaught exceptions during work")
}
}
object TestWorker {
def main(args: Array[String]) : Unit = {
val system = ActorSystem("system")
val reaper = system.actorOf(Props[TestReaper])
val worker1 = system.actorOf(Props[TestWorker])
val worker2 = system.actorOf(Props[TestWorker])
reaper ! WatchMe(worker1)
reaper ! WatchMe(worker2)
Thread.sleep(3000) // make sure WatchMe will be delivered before StartWork
worker1 ! StartWork
worker2 ! StartWork
system.awaitTermination()
}
}
This program will hang forever.
It seems the reaper cannot receive Terminated
messages if workers throw uncaught exceptions
Can somebody tell my why? Many thanks in advance!
Correct answer by @mattinbits:
The reason that this program hangs forever is that in my code TestWorker
are not children of TestReaper
even TestReaper
calls context.watch(ref)
.
context.watch() doesn't mean becoming a child. context.watch(ref)
just means that TestReaper
will get notified when a TestWorker
actor dies.
SupervisorStrategy
and context.watch()
are two different things. SupervisorStrategy
will only have influence on all children actors.
Putting override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
in the TestReaper
won't make TestWorker
stopped when exceptions happen inside TestWorker
. Instead we need to change the SupervisorStrategy
of TestWorker
s parent actor. Since all actors in the code above are created by system.actorOf()
, they are children of the Guardian Actor /user
, so actually we need to change the supervision strategy of the /user
actor , by adding akka { actor { guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" } }
in application.conf
However, it's better to use another actor as the supervision actor, just like what @mattinbits does in his code.
Reaper
recieved theWatchMe
message is prior to the time that the workers recievedStartWork
message? Since!
is asynchronous, there will be a possibility thatStartWork
comes beforeWatchMe
– ymonadThread.sleep(3000)
betweenreaper ! WatchMe(worker2)
andworker1 ! SartWork
, still hang – soulmachine