1
votes

I'm trying to catch 'Terminate Signal' from child to parent actor, however among pool of deadletter messages, the signal fails to arrive on parent actor's queue. What's the best way to resolve this?

Here's the snippet code I'm working on:

class MinerActor extends Actor {
   var count:Int = 0
   def receive = {
       case Mine =>
            //some task here     
            //if success
               count = count + 1
            //
            if (count >= 100) 
            {
                context.stop(self) 
            }
}

class MasterActor extends Actor {
    val miner = context.actorOf(Props(new MinerActor,name = "miner")
    context.watch(miner)

    def receive = {
      case Foo => 
            while (true) {
              miner ! Mine
            }

      case Terminated(miner) =>
            println("Miner Terminated!!")
            context.stop(self)
            context.system.shutdown
    }
}

Here the 'Terminated(miner)' case never gets called. Instead on stdout I see lots of dead-letter messages sent from Master to Miner (which is kind of expected as miner actor gets stopped). However how to prioritize the Terminate signal on the Event bus so as to reach Master Actor?

If I limit while loop to some 200 instead of infinity, after 100 deadletter messages, I receive Terminate Signal which prints "Miner Terminated!!". But how to achieve this when while loop being in infinity?

I'm new to Scala/Akka programming, my main aim here is to run '//some task' for 100 successful times and then exit the whole program. Is this a good way to achieve that task?

2

2 Answers

4
votes

Replace:

case Foo => 
  while (true) {
    miner ! Mine
  }

with

case Foo =>
  miner ! Mine
  self forward Foo
2
votes

The problem is that you're infinite while loop is blocking the actor thread. As a consequence, your master actor is always stuck in processing the first arrived Foo message and will never process any other messages in the mailbox. The reason for this is that there is only a single thread which is responsible for receiving the messages. This has some really nice implications because you basically don't have to worry about concurrency within a single actor if your state changes only happen within this thread.

There are multiple ways to solve this problem. I'd recommend using the scheduler to schedule a repeated task.

class MasterActor extends Actor {
  var minerOption: Option[ActorRef] = None
  var mineMessageOption: Option[Cancellable] = None

  override def preStart: Unit = {
    minerOption = Some(context.actorOf(Props(new MinerActor,name = "miner")))

    minerOption.foreach(context.watch(_))

    import context.dispatcher

    mineMessageOption = Some(context.system.scheduler.schedule(0 seconds, 1 seconds, self, Foo))
  }

  def receive = {
    case Foo =>
      minerOption.foreach {
        _ ! Mine
      }

    case Terminated(miner) =>
      println("Miner Terminated!!")

      mineMessageOption.foreach(_.cancel())

      context.stop(self)
      context.system.shutdown
  }
}

In the schedule call you can define the interval of the message Foo and, thus, how many messages will be sent to the miner.