1
votes

New to both Scala + Akka here. I have a small network of actors that all send different messages to each other. Many of them need access to my DeadLetterChannel actor in case they receive a message that they are not sure how to handle:

class DeadLetterChannel(val queue : mutable.Queue[Any]) extends Actor with Logging {
  override def receive: Receive = {
    case any : Any =>
      log.info(s"${this.getClass} just received a message of type ${} from ${sender().getClass}.")
      queue.enqueue(any)
    case _ =>
      log.warn(s"${this.getClass} just received a non-Any message.")
  }
}

Then from inside many other actors:

class DeviceManager(val dlc : DeadLetterChannel) extends ActorRef {
  override def receive: Receive = {
    case Heartbeat =>
      // Handle Heartbeat here...
    case Connect:
      // Handle Connect here...
    case Disconnect:
      // Handle Disconnect here...
    case _ =>
      dlc ! ???
  }
}

I have two problems here:

  • I'm getting a compiler error on the sending of the message (specifically the ! overload: "Cannot resolve symbol !"); and
  • I have no idea what to send to dlc in the _ case, any ideas? Obviously I'm just trying to send the message that DeviceManager received that is neither a Heartbeat, a Connect or a Disconnect type
2

2 Answers

1
votes

You may have over-simplified the code for your example, but you can't just send a message to an Actor class instance. You need to create a named actor using actorOf on an ActorSystem, and then use actorSelection on that system to get a handle on the actor.

For the second part of your question, just put a match value in the case and send the result:

case msg =>
  dlc ! msg

Also, you might want to use a different name for your class because a dead letter is message that can't be delivered, not a message that can't be handled by the recipient.

1
votes

Actor classes should extend Actor instead of ActorRef. Also, use ActorRef to reference an actor. DeviceManager, therefore, should be defined as the following:

class DeviceManager(dlc: ActorRef) extends Actor {
  // ...
}

The idea here is that you would use the ActorRef of the DeadLetterChannel actor when creating the device actors. For example:

val dlc: ActorRef = context.actorOf(/* .. */)
val device1: ActorRef = context.actorOf(Props(classOf[DeviceManager], dlc)
val device2: ActorRef = context.actorOf(Props(classOf[DeviceManager], dlc)

Then you can do as Tim suggested in his answer and capture the default case in the receive block and send this message to dlc:

class DeviceManager(dlc: ActorRef) extends Actor {
  def receive = {
    /* other case clauses */
    case msg =>
      dlc ! msg
  }
}

Also, instead of passing a mutable queue to DeadLetterChannel as a constructor argument, encapsulate this state inside the actor as an immutable queue and expose it only via message passing:

class DeadLetterChannel extends Actor with akka.actor.ActorLogging {
  var queue = collection.immutable.Queue[Any](0)

  def receive = {
    case GetMessages => // GetMessages is a custom case object
      sender() ! queue
    case msg =>
      val s = sender()
      log.info(s"DLC just received the following message from [$s]: $msg")
      queue = queue.enqueue(msg)
  }
}

Another approach is to not define a default case (i.e., leave out the case msg => clause) and use Akka's mechanisms for dealing with unhandled messages. Below are a couple of options:

  1. Enable the built-in logging for unhandled messages, which logs these messages at the DEBUG level:

    akka {
      actor {
        debug {
          # enable DEBUG logging of unhandled messages
          unhandled = on
        }
      }
    }
    

    This approach is perhaps the simplest if you just want to log unhandled messages.

  2. As the documentation states:

    If the current actor behavior does not match a received message...by default publishes an akka.actor.UnhandledMessage(message, sender, recipient) on the actor system’s event stream.

    In light of this, you could create an actor that subscribes to the event stream and handles akka.actor.UnhandledMessage messages. Such an actor could look like the following:

    import akka.actor.{ Actor, UnhandledMessage, Props }
    
    class UnhandledMessageListener extends Actor {
      def receive = {
        case UnhandledMessage(msg, sender, recipient) =>
          // do something with the message, such as log it and/or something else
      }
    }
    
    val listener = system.actorOf(Props[UnhandledMessageListener])
    system.eventStream.subscribe(listener, classOf[UnhandledMessage])
    

    More information about the event stream and creating a subscriber to that stream can be found here.