0
votes

I have akka system which is basically two producer actors that send messages to one consumer actor. In a simplified form I have something like this:

class ProducerA extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageA()
    }

    ... more code ...
}

class ProducerB extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageB()
    }

    ... more code ...
}

class Consumer extends Actor {
    def receive = {
        case A => handleMessageA(A)
        case B => handleMessageB(B)
    }

    ... more code ...
}

And they are all siblings of the same akka system.

I am trying to figure out how to terminate this system gracefully. This means that on shutdown I want ProducerA and ProducerB to stop immediately and then I want Consumer to finish processing whatever messages are left in the message queue and then shutdown.

It seems like what I want is for the Consumer actor to be able to watch for the termination of both ProducerA and ProducerB. Or generally, it seems like what I want is to be able to send a PoisonPill message to the Consumer after both the producers are stopped.

https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method

The above tutorial has a pretty good explanation of how one actor can watch for the termination of one other actor but not sure how an actor could watch for the termination of multiple actors.

3

3 Answers

1
votes
import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration.DurationInt

class Producer extends Actor {
  def receive = {
    case _ => println("Producer received a message")
  }
}

case object KillConsumer

class Consumer extends Actor {

  def receive = {
    case KillConsumer =>
      println("Stopping Consumer After All Producers")
      context.stop(self)
    case _ => println("Parent received a message")
  }

  override def postStop(): Unit = {
    println("Post Stop Consumer")
    super.postStop()
  }
}

class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
  producerListRef.foreach(x => context.watch(x))
  context.watch(consumerRef)
  var producerActorCount = producerListRef.length
  implicit val timeout: Timeout = Timeout(5 seconds)
  override def receive: Receive = {
    case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
      consumerRef ! KillConsumer

    case Terminated(x) if producerListRef.contains(x) =>
      producerActorCount -= 1

    case Terminated(`consumerRef`) =>
      println("Killing ProducerWatchers On Consumer End")
      context.stop(self)

    case _ => println("Dropping Message")
  }

  override def postStop(): Unit = {
    println("Post Stop ProducerWatchers")
    super.postStop()
  }
}

object ProducerWatchers {
  def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
}

object AkkaActorKill {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("AkkaActorKill")
    implicit val timeout: Timeout = Timeout(10 seconds)

    val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
    val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
    val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
    val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")

    val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")

    producer1 ! PoisonPill
    producer2 ! PoisonPill
    producer3 ! PoisonPill

    Thread.sleep(5000)
    actorSystem.terminate
  }
}

It can be implemented using ProducerWatchers actor, which manages producers killed, once all the producers are killed you can kill the Consumer actor, and then the ProducerWatchers actor.

1
votes

An actor can watch multiple actors simply via multiple invocations of context.watch, passing in a different ActorRef with each call. For example, your Consumer actor could watch the termination of the Producer actors in the following way:

case class WatchMe(ref: ActorRef)

class Consumer extends Actor {
  var watched = Set[ActorRef]()

  def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched = watched + ref
    case Terminated(ref) =>
      watched = watched - ref
      if (watched.isEmpty) self ! PoisonPill
    // case ...
  }
}

Both Producer actors would send their respective references to Consumer, which would then monitor the Producer actors for termination. When the Producer actors are both terminated, the Consumer sends a PoisonPill to itself. Because a PoisonPill is treated like a normal message in an actor's mailbox, the Consumer will process any messages that are already enqueued before handling the PoisonPill and shutting itself down.

A similar pattern is described in Derek Wyatt's "Shutdown Patterns in Akka 2" blog post, which is mentioned in the Akka documentation.

0
votes

so the solution I ended up going with was inspired by Derek Wyatt's terminator pattern

val shutdownFut = Future.sequence(
  Seq(
    gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
    gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
  )
).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))

Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))

This was more or less exactly what I wanted. The consumer shutdown waits for the producers to shutdown based on the fulfillment of futures. Furthermore, the whole shutdown itself results in a future which you can wait on therefore being able to keep the thread up long enough for everything to clean up properly.