5
votes

I need to publish messages of different types to event stream, and those messages should have different priorities for example, if 10 messages of type A have been posted, and one message of type B is posted after all, and priority of B is higher than the priority of A - message B should be picked up by next actor even if there are 10 messages of type A in queue.

I have read about prioritized messages here and created my simple implementation of that mailbox:

  class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

    PriorityGenerator {
      case ServerPermanentlyDead => println("Priority:0"); 0
      case ServerDead => println("Priority:1"); 1
      case _ => println("Default priority"); 10
    }

  )

then I configured it in application.conf

akka {

    actor {

        prio-dispatcher {
            type = "Dispatcher"
            mailbox-type = "mailbox.PrioritizedMailbox"
        }

    }

}

and wired into my actor:

private val myActor = actors.actorOf(
  Props[MyEventHandler[T]].
    withRouter(RoundRobinRouter(HIVE)).
    withDispatcher("akka.actor.prio-dispatcher").
    withCreator(
    new Creator[Actor] {
      def create() = new MyEventHandler(storage)
    }), name = "eventHandler")

I'm using ActorSystem.eventStream.publish in order to send messages, and my actor is subscribed to it (I can see in logs that messages are processed, but in FIFO order).

However looks like it is not enough, because in logs/console I've never seen the messages like "Default priority". Am I missing something here? Does the described approach work with event streams or just with direct invocations of sending a message on actor? And how do I get prioritized messages with eventStream?

1

1 Answers

10
votes

Your problem is that your actors are insanely fast so messages get processed before they have time to queue up, so there cannot be any priorization done by the mailbox. The example below proves the point:

  trait Foo 
  case object X extends Foo 
  case object Y extends Foo 
  case object Z extends Foo 

  class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config) 
extends UnboundedPriorityMailbox( 
    PriorityGenerator { 
      case X ⇒ 0 
      case Y ⇒ 1 
      case Z ⇒ 2 
      case _ ⇒ 10 
    }) 

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString( 
        """ prio-dispatcher { 
        type = "Dispatcher" 
          mailbox-type = "%s" 
        }""".format(classOf[PrioritizedMailbox].getName))) 
      val latch = new java.util.concurrent.CountDownLatch(1) 
      val a = s.actorOf(Props(new akka.actor.Actor { 
        latch.await // Just wait here so that the messages are queued up 
inside the mailbox 
        def receive = { 
          case any ⇒ /*println("Processing: " + any);*/ sender ! any 
        } 
      }).withDispatcher("prio-dispatcher")) 
      implicit val sender = testActor 
      a ! "pig" 
      a ! Y 
      a ! Z 
      a ! Y 
      a ! X 
      a ! Z 
      a ! X 
      a ! "dog" 

      latch.countDown() 

      Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) } 
      s.shutdown() 

This test passes with flying colors