I need to publish messages of different types to event stream, and those messages should have different priorities for example, if 10 messages of type A have been posted, and one message of type B is posted after all, and priority of B is higher than the priority of A - message B should be picked up by next actor even if there are 10 messages of type A in queue.
I have read about prioritized messages here and created my simple implementation of that mailbox:
class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}
)
then I configured it in application.conf
akka {
actor {
prio-dispatcher {
type = "Dispatcher"
mailbox-type = "mailbox.PrioritizedMailbox"
}
}
}
and wired into my actor:
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")
I'm using ActorSystem.eventStream.publish in order to send messages, and my actor is subscribed to it (I can see in logs that messages are processed, but in FIFO order).
However looks like it is not enough, because in logs/console I've never seen the messages like "Default priority". Am I missing something here? Does the described approach work with event streams or just with direct invocations of sending a message on actor? And how do I get prioritized messages with eventStream?