2
votes

After I read this excellent blog written by the AKKA team, Shutdown Patterns in AKKA 2, I run the code and it indeed works.

But when I do another experiment with a slight change, throwing out an exception in workers, then this pattern won't work. Since it's reasonable that workers may throw any kinds of exceptions during working, right?

The following are my code, two files:

Reaper.scala, copied from the article mentioned above:

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

TestWorker.scala

import akka.actor.{SupervisorStrategy, Props, ActorSystem, Actor}
import Reaper._


class TestReaper extends Reaper {
  def allSoulsReaped(): Unit = context.system.shutdown()
  override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
}

// The reaper sends this message to all workers to notify them to start work
case object StartWork

class TestWorker extends  Actor {
  def receive = {
    case StartWork =>
      // do real work ...
      throw new IllegalStateException("Simulate uncaught exceptions during work")
  }
}

object TestWorker {
  def main(args: Array[String]) : Unit = {
    val system = ActorSystem("system")
    val reaper = system.actorOf(Props[TestReaper])
    val worker1 = system.actorOf(Props[TestWorker])
    val worker2 = system.actorOf(Props[TestWorker])

    reaper ! WatchMe(worker1)
    reaper ! WatchMe(worker2)
    Thread.sleep(3000) // make sure WatchMe will be delivered before StartWork
    worker1 ! StartWork
    worker2 ! StartWork
    system.awaitTermination()
  }
}

This program will hang forever.

It seems the reaper cannot receive Terminated messages if workers throw uncaught exceptions

Can somebody tell my why? Many thanks in advance!

Correct answer by @mattinbits:

The reason that this program hangs forever is that in my code TestWorker are not children of TestReaper even TestReaper calls context.watch(ref).

context.watch() doesn't mean becoming a child. context.watch(ref) just means that TestReaper will get notified when a TestWorker actor dies.

SupervisorStrategy and context.watch() are two different things. SupervisorStrategy will only have influence on all children actors.

Putting override val supervisorStrategy = SupervisorStrategy.stoppingStrategy in the TestReaper won't make TestWorker stopped when exceptions happen inside TestWorker. Instead we need to change the SupervisorStrategy of TestWorkers parent actor. Since all actors in the code above are created by system.actorOf(), they are children of the Guardian Actor /user, so actually we need to change the supervision strategy of the /user actor , by adding akka { actor { guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" } } in application.conf

However, it's better to use another actor as the supervision actor, just like what @mattinbits does in his code.

1
Did you confirmed that the time Reaper recieved the WatchMe message is prior to the time that the workers recieved StartWork message? Since ! is asynchronous, there will be a possibility that StartWork comes before WatchMeymonad
I added Thread.sleep(3000) between reaper ! WatchMe(worker2) and worker1 ! SartWork, still hangsoulmachine

1 Answers

4
votes

It is not enough to watch the actors, you also have to make sure the actors stop (which is the condition under which Terminated gets sent).

By default, when an actor throws an exception, the strategy is to restart it. You need to give the actors a supervisor which will apply the Stop directive.

Have a look at the following, both tests pass (with Reaper unchanged from your version above):

import java.util.concurrent.TimeoutException
import Reaper.WatchMe
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.testkit.{TestProbe, TestKit}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike, WordSpec}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

case object StartWork

class TestReaper extends Reaper {
  def allSoulsReaped(): Unit = context.system.shutdown()
}

class TestWorker extends  Actor {
  def receive = {
    case StartWork =>
      // do real work ...
      throw new IllegalStateException("Simulate uncaught exceptions during work")
  }
}

class TestParent(reaper: ActorRef, probe: ActorRef) extends Actor {

  def receive = {
    case "Start" =>
      val worker1 = context.actorOf(Props[TestWorker])
      val worker2 = context.actorOf(Props[TestWorker])
      reaper ! WatchMe(worker1)
      reaper ! WatchMe(worker2)
      worker1 ! StartWork
      worker2 ! StartWork
  }

  override def supervisorStrategy = OneForOneStrategy() {
    case ex: IllegalStateException =>
      probe ! "Stopped a worker"
      Stop
  }
}


class TestSupervision extends TestKit(ActorSystem("Test"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll{

  "Supervision" should {

    "Stop the actor system when the parent stops the workers" in {
      val reaper = system.actorOf(Props[TestReaper])
      val probe = TestProbe()
      val parent = system.actorOf(Props(new TestParent(reaper, probe.ref)))
      parent ! "Start"
      probe.expectMsg("Stopped a worker")
      probe.expectMsg("Stopped a worker")
      import system.dispatcher
      val terminatedF = Future {
        system.awaitTermination()
      }
      Await.ready(terminatedF, 2 seconds)
    }
  }

  override def afterAll(){
    system.shutdown()
  }
}

class TestLackSupervision extends TestKit(ActorSystem("Test2"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll{

  "Lack of Supervision" should {
    "Not stop the actor system when the workers don't have an appropriate parent" in {
      val reaper = system.actorOf(Props[TestReaper])
      val worker1 = system.actorOf(Props[TestWorker])
      val worker2 = system.actorOf(Props[TestWorker])
      reaper ! WatchMe(worker1)
      reaper ! WatchMe(worker2)
      import system.dispatcher
      val terminatedF = Future { system.awaitTermination()}
      a [TimeoutException] should be thrownBy Await.ready(terminatedF, 2 seconds)
    }
  }

  override def afterAll(){
    system.shutdown()
  }
}

By default when actors throw an exception they are restarted. Since supervision strategy is applied from parent to child, the TestParent exists to enforce the Stop directive on the children. Your original code would not work for this reason.

If you want top level actors (Those launched with system.actorOf) to stop on an exception, you could set the configuration property akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy" but in my example I prefer to use a parent actor since actor hierarchies are a normal way to organise supervision in Akka.

To run as an app, do something similar to the following:

object Main extends App {
  val system = ActorSystem("Example")
  val reaper = system.actorOf(Props[TestReaper])
  val dummyProbe = system.actorOf(Props(new Actor{

    def receive = {
      case "Stopped a worker" => println("Stopped a worker")
    }

  }))
  val parent = system.actorOf(Props(new TestParent(reaper, dummyProbe)))
  parent ! "Start"
  system.awaitTermination()
}

To stop the exceptions from being printed on the command line and muddying the output, change the supervision strategy as follows:

override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {...}