1
votes

While learning akka's supervision strategies I came up with the following example:

I'd like the parent actor (which has a custom supervision strategy) to ask its children actors about some state and return the result to the sender. The call to the actor also should be an ask, not a tell (just to work with Futures a little). The supervision strategy is tested by storing state in child actors and querying the children after killing one of them.

I came up with the tests and implementations below. I'd like to use the pipeTo pattern to pack children's futures into a single future which will be returned to the parent's sender. This approach however doesn't work as expected. I've established that the ask performed by the parent on the children doesn't return the expected state.

I've also tried:

  • to use only a single dispatcher by setting .withDispatcher(CallerThreadDispatcher.Id) on the children actors
  • retrieve the children's states synchronously using Await.result(future, timeout)

but none of the approaches helped. How can I make my code work as expected? Are there any other areas that could be improved (like e.g. setting artificial state in child actors just to know that they've been restarted?)

SupervisorStrategiesTest:

package org.skramer.learn.supervisorStrategies

import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, AllForOneStrategy, DeadLetter, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.ask
import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe}
import akka.util.Timeout
import org.scalatest.{Matchers, WordSpecLike}
import org.skramer.learn.AkkaSystemClosing
import org.skramer.learn.supervisorStrategies.StateHoldingActor.{ActorThrowCommand, AddStateCommand, GetStateCommand}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

class SupervisorStrategiesTest extends TestKit(ActorSystem("testSystem")) with WordSpecLike with Matchers with ImplicitSender with AkkaSystemClosing {

  import StateHoldingActor._

  "actor with custom supervision strategy" should {
    "apply the strategy to a single child" in {
      implicit val timeout: Timeout = 3 seconds

      val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))

      val initialStateFuture = parentActor ? "state"
      val initialState = Await.result(initialStateFuture, timeout.duration)
      initialState shouldBe List(Vector(), Vector())

      parentActor ! ("first", AddStateCommand(1))
      parentActor ! ("second", AddStateCommand(2))

      val currentStateFuture = parentActor ? "state"
      val currentState = Await.result(currentStateFuture, timeout.duration)
      currentState shouldBe List(Vector(1), Vector(2))

      parentActor ! "throwFirst"

      val stateAfterRestartFuture = parentActor ? "state"
      val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
      stateAfterRestart shouldBe List(Vector(), Vector(2))
    }

    "apply the strategy to all children" in {
      implicit val timeout: Timeout = 3 seconds

      val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))

      val initialStateFuture = parentActor ? "state"
      val initialState = Await.result(initialStateFuture, timeout.duration)
      initialState shouldBe List(Vector(), Vector())

      parentActor ! ("first", AddStateCommand(1))
      parentActor ! ("second", AddStateCommand(2))

      val currentStateFuture = parentActor ? "state"
      val currentState = Await.result(currentStateFuture, timeout.duration)
      currentState shouldBe List(Vector(1), Vector(2))

      parentActor ! "throwFirst"

      val stateAfterRestartFuture = parentActor ? "state"
      val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
      stateAfterRestart shouldBe List(Vector(), Vector())
    }
  }


}

StateHoldingActor:

object StateHoldingActor {

  case class ActorThrowCommand()

  case class AddStateCommand(stateElement: Int)

  case class GetStateCommand()

  case class GetStateCommandWithResponse()

  def props(receiver: ActorRef): Props = Props(new StateHoldingActor())
}

class StateHoldingActor() extends Actor with ActorLogging {
  log.info("about to create state")
  private var state = Vector[Int]()
  log.info(s"state created: $state")

  import StateHoldingActor._

  override def receive: Receive = {
    case AddStateCommand(i) =>
      log.info(s"extending state: $state")
      state = i +: state
      log.info(s"extended state: $state")
    case GetStateCommand() =>
      log.info(s"returning state: $state")
      sender ! state
    case GetStateCommandWithResponse() =>
      log.info(s"returning state in response: $state")
      sender ! state
    case _: ActorThrowCommand =>
      log.info(s"throwing exception with state: $state")
      throw new IllegalStateException("Should crash actor instance and restart state")

  }

}

ParentActor:

abstract class ParentActor(recipient: ActorRef) extends Actor with ActorLogging {
  log.info("creating children")
  private val stateHoldingActor1 = context
                                   .actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
  private val stateHoldingActor2 = context
                                   .actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
  log.info("children created")

  implicit val timeout: Timeout = 3 seconds

  import scala.concurrent.ExecutionContext.Implicits.global

  override def receive: Receive = {
    case "throwFirst" =>
      log.info("stateHoldingActor1 ! ActorThrowCommand")
      stateHoldingActor1 ! ActorThrowCommand
    case "throwSecond" =>
      log.info("stateHoldingActor1 ! ActorThrowCommand")
      stateHoldingActor2 ! ActorThrowCommand
    case "state" =>
      log.info("gathering states")
      val futureResults: Future[List[Any]] = Future
                                             .sequence(List(stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand))
      import akka.pattern.pipe
      futureResults pipeTo sender()

    case ("first", msg@AddStateCommand(_)) => stateHoldingActor1 forward msg
    case ("second", msg@AddStateCommand(_)) => stateHoldingActor2 forward msg
  }
}

OneForOneParentActor:

class OneForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case _ => Restart
  }
}

allForOneParentActor:

class AllForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
  override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy() {
    case _ => Restart
  }
}
1

1 Answers

3
votes

You are declaring your parameterless messages as case classes (with parentheses), but your ParentActor implementation sends without parentheses and therefore is only sending the type, not an actual instance. This means the receive method (looking for an instance) within the StateHoldingActor won't match and the ask never returns.

e.g. stateHoldingActor1 ? GetStateCommand(), stateHoldingActor2 ? GetStateCommand() instead of stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand

After fixing this, your first test should run through. It might be a good idea to use case objects for your messages, which don't require a parameter. Then this won't happen again.

The second test still fails though. One reason for that is probably that you still use the OneForOneParentActor in the second test, where you probably want to test the AllForOneParentActor. I am working on the other reason ;) Posting this answer, so that you can also look into the other issues.

EDIT

The second test fails simply because of a race condition. When the state is requested the last time (stateAfterRestartFuture), the first actor already failed, because of the exception, but the second actor didn't get to restart yet (add Thread.sleep after "throwFirst" to test).

EDIT2

I created a github repository with the code I used to test/fix: https://github.com/thwiegan/so_ActorSupervisionTest

EDIT3

In response to your comments, here is what happens, when I run the second test from my GitHub code:

[INFO] [06/19/2017 10:32:07.734] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] creating children
[INFO] [06/19/2017 10:32:07.735] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] children created
[INFO] [06/19/2017 10:32:07.735] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] gathering states
[INFO] [06/19/2017 10:32:07.736] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] returning state: Vector()
[INFO] [06/19/2017 10:32:07.736] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$b] returning state: Vector()
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] gathering states
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] extended state: Vector(3)
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$b] extended state: Vector(4)
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] returning state: Vector(3)
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$b] returning state: Vector(4)
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] stateHoldingActor1 ! ActorThrowCommand
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] gathering states
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/$b/$a] throwing exception with state: Vector(3)
[INFO] [06/19/2017 10:32:07.738] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$b] returning state: Vector(4)
[INFO] [06/19/2017 10:32:07.741] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] Children crashed
[ERROR] [06/19/2017 10:32:07.741] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$a] Should crash actor instance and restart state
java.lang.IllegalStateException: Should crash actor instance and restart state
[INFO] [06/19/2017 10:32:07.752] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] About to restart actor with state: Vector(3)
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$b] About to restart actor with state: Vector(4)
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] returning state: Vector()
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b] gathering states
[INFO] [06/19/2017 10:32:07.754] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] returning state: Vector()
[INFO] [06/19/2017 10:32:07.754] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$b] returning state: Vector()

As you can see the, as the ParentActor tries to gather the states immediately after the throwFirst command, the second stateful actor (Vector(4)) returns its state before the first stateful actor (Vector(3)) propagates its crash the the ParentActor (it just takes time). That's why this is a race condition between the propagation of the crash to the ParentActor - and therefore to all stateful actors - and the gather state command.

Since my test does not pass in your case, I assume, that some parameters (machine timing or any delays) are different.

EDIT

As response to your comment: During the time of the restart, the ParentActor is already done serving handling the state query. Since you only ask the two StatefulActors and then pass the futures to the pipeTo pattern, the ParentActor doesn't need to touch this future anymore, so it can continue processing whatever comes in. In this case this is the crash report of one of it's children. So while the first StatefulActor crashed and then got the state query queued to be processed after restart, the second StatefulActor received the state query and therefore served it before receiving the restart command. So it is being processed concurrently in the sense, that the ParentActor processes the crash, while the pipeTo pattern, which is executed in a separate future continues to run the state query. An option to mitigate this in this case, would be to stop the child actors instead of restarting them. This would make the pipeTo future time out, since the first actor won't respond and therefore no, possibly inconsistent, state would be leaked.