While learning akka's supervision strategies I came up with the following example:
I'd like the parent actor (which has a custom supervision strategy) to ask
its children actors about some state and return the result to the sender
. The call to the actor also should be an ask
, not a tell
(just to work with Future
s a little). The supervision strategy is tested by storing state in child actors and querying the children after killing one of them.
I came up with the tests and implementations below. I'd like to use the pipeTo
pattern to pack children's future
s into a single future
which will be returned to the parent's sender
.
This approach however doesn't work as expected. I've established that the ask
performed by the parent on the children doesn't return the expected state.
I've also tried:
- to use only a single dispatcher by setting
.withDispatcher(CallerThreadDispatcher.Id)
on the children actors - retrieve the children's states synchronously using
Await.result(future, timeout)
but none of the approaches helped. How can I make my code work as expected? Are there any other areas that could be improved (like e.g. setting artificial state in child actors just to know that they've been restarted?)
SupervisorStrategiesTest:
package org.skramer.learn.supervisorStrategies
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, AllForOneStrategy, DeadLetter, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.ask
import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe}
import akka.util.Timeout
import org.scalatest.{Matchers, WordSpecLike}
import org.skramer.learn.AkkaSystemClosing
import org.skramer.learn.supervisorStrategies.StateHoldingActor.{ActorThrowCommand, AddStateCommand, GetStateCommand}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
class SupervisorStrategiesTest extends TestKit(ActorSystem("testSystem")) with WordSpecLike with Matchers with ImplicitSender with AkkaSystemClosing {
import StateHoldingActor._
"actor with custom supervision strategy" should {
"apply the strategy to a single child" in {
implicit val timeout: Timeout = 3 seconds
val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))
val initialStateFuture = parentActor ? "state"
val initialState = Await.result(initialStateFuture, timeout.duration)
initialState shouldBe List(Vector(), Vector())
parentActor ! ("first", AddStateCommand(1))
parentActor ! ("second", AddStateCommand(2))
val currentStateFuture = parentActor ? "state"
val currentState = Await.result(currentStateFuture, timeout.duration)
currentState shouldBe List(Vector(1), Vector(2))
parentActor ! "throwFirst"
val stateAfterRestartFuture = parentActor ? "state"
val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
stateAfterRestart shouldBe List(Vector(), Vector(2))
}
"apply the strategy to all children" in {
implicit val timeout: Timeout = 3 seconds
val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))
val initialStateFuture = parentActor ? "state"
val initialState = Await.result(initialStateFuture, timeout.duration)
initialState shouldBe List(Vector(), Vector())
parentActor ! ("first", AddStateCommand(1))
parentActor ! ("second", AddStateCommand(2))
val currentStateFuture = parentActor ? "state"
val currentState = Await.result(currentStateFuture, timeout.duration)
currentState shouldBe List(Vector(1), Vector(2))
parentActor ! "throwFirst"
val stateAfterRestartFuture = parentActor ? "state"
val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
stateAfterRestart shouldBe List(Vector(), Vector())
}
}
}
StateHoldingActor:
object StateHoldingActor {
case class ActorThrowCommand()
case class AddStateCommand(stateElement: Int)
case class GetStateCommand()
case class GetStateCommandWithResponse()
def props(receiver: ActorRef): Props = Props(new StateHoldingActor())
}
class StateHoldingActor() extends Actor with ActorLogging {
log.info("about to create state")
private var state = Vector[Int]()
log.info(s"state created: $state")
import StateHoldingActor._
override def receive: Receive = {
case AddStateCommand(i) =>
log.info(s"extending state: $state")
state = i +: state
log.info(s"extended state: $state")
case GetStateCommand() =>
log.info(s"returning state: $state")
sender ! state
case GetStateCommandWithResponse() =>
log.info(s"returning state in response: $state")
sender ! state
case _: ActorThrowCommand =>
log.info(s"throwing exception with state: $state")
throw new IllegalStateException("Should crash actor instance and restart state")
}
}
ParentActor:
abstract class ParentActor(recipient: ActorRef) extends Actor with ActorLogging {
log.info("creating children")
private val stateHoldingActor1 = context
.actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
private val stateHoldingActor2 = context
.actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
log.info("children created")
implicit val timeout: Timeout = 3 seconds
import scala.concurrent.ExecutionContext.Implicits.global
override def receive: Receive = {
case "throwFirst" =>
log.info("stateHoldingActor1 ! ActorThrowCommand")
stateHoldingActor1 ! ActorThrowCommand
case "throwSecond" =>
log.info("stateHoldingActor1 ! ActorThrowCommand")
stateHoldingActor2 ! ActorThrowCommand
case "state" =>
log.info("gathering states")
val futureResults: Future[List[Any]] = Future
.sequence(List(stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand))
import akka.pattern.pipe
futureResults pipeTo sender()
case ("first", msg@AddStateCommand(_)) => stateHoldingActor1 forward msg
case ("second", msg@AddStateCommand(_)) => stateHoldingActor2 forward msg
}
}
OneForOneParentActor:
class OneForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
}
allForOneParentActor:
class AllForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy() {
case _ => Restart
}
}