2
votes

I have an Akka based application which has multiple actor systems joined in cluster formation. 1 actor system is Master and others are Slaves. Sometimes, it can happen that master actor system crashes. In the case, all the Slaves form there own cluster. In my usecase, I want to avoid this and kill all the slaves as soon as the connection to master is lost. To implement this, I have added a watch on master. Here is the sample code.

class SlaveActor(input1: String, input2: String) extends Actor {
    .....
    .....
    context.actorSelection(MASTER_ACTOR_ADDRESS) ! Identify(1)
    ....

    def receive = {
        case ActorIdentity(arg1, actorRef)=>
            actorRef.foreach(context.watch(_))
        case Terminated(actorRef) =>
            self ! PoisonPill
        .......
        .......
    }

    ......
  }
}

This all works as expected but now I want to test this behavior using Akka test framework. I have tried different things but it doesn't work.

Note: The slave actor gets the address of master as input parameter.

describe("master slave tests") {
it("slave should kill itself as soon as master is down") {
    val dummyActor = system.actorOf(Props.empty)
    master = system.actorOf(Props(classOf[MasterActor], TestProbe().ref, dummyActor), "context-supervisor")

    slave = system.actorOf(
        Props(classOf[SlaveActor], dummyActor, s"${master.path.address.toString}${master.path.toStringWithoutAddress}"))

    val masterProbe = TestProbe()
    masterProbe.watch(master)

    val slaveProbe = TestProbe()
    slaveProbe.watch(slave)

    // NOTE: Simulating master DOWN
    master ! akka.actor.PoisonPill
    masterProbe.expectTerminated(master)

    slaveProbe.expectTerminated(slave)
}
}

Master kills itself successfully but somehow slave Terminated event is not fired. Any help ?

1

1 Answers

1
votes

I found the problem. It was a timing issue. My master was getting killed before my manager was able to add a watch. I added a Thread.sleep() to wait for initialization. After that I receive all the messages including Termination of manager.

describe("master slave tests") {
   it("slave should kill itself as soon as master is down") {
   val dummyActor = system.actorOf(Props.empty)
   master = system.actorOf(Props(classOf[MasterActor], 
                     TestProbe().ref, dummyActor), "context-supervisor")

   slave = system.actorOf(
       Props(classOf[SlaveActor], dummyActor, s"${master.path.address.toString}${master.path.toStringWithoutAddress}"))

   val masterProbe = TestProbe()
   masterProbe.watch(master)

   val slaveProbe = TestProbe()
   slaveProbe.watch(slave)

   Thread.sleep(2000)
   // NOTE: Simulating master DOWN
   master ! akka.actor.PoisonPill
   masterProbe.expectTerminated(master)

   slaveProbe.expectTerminated(slave)
 }
}