2
votes

I am trying to create a custom priority mailbox box with akka typed that prioritizes the messages based on the type of the message. Following is my code that creates the actor and sends messages to it. However, the messages are being processed in the order that they are received and not in the order defined by the custom mailbox.

package com.akka.prac

import akka.actor.typed.{ActorSystem, Behavior, DispatcherSelector, Props, Settings}
import akka.actor.typed.scaladsl.Behaviors
import akka.dispatch.{PriorityGenerator, UnboundedPriorityMailbox}
import com.typesafe.config.{Config, ConfigFactory}

object PriorityMailBoxTyped extends App {

  val config = ConfigFactory.load("applicationtyped.conf")
  val actorSystem = ActorSystem(
    MyPriorityActorTyped.receive,
    "PriorityMailBox",
    config,
    DispatcherSelector.fromConfig("prio-dispatcher")
  )

  actorSystem ! DoubleMessage(6.0)
  actorSystem ! IntMessage(1)
  actorSystem ! DoubleMessage(5.0)
  actorSystem ! IntMessage(3)
  actorSystem ! StringMessage("Hello")
  actorSystem ! IntMessage(5)
  actorSystem ! StringMessage("I am priority actor")
  actorSystem ! StringMessage(
    "I process string messages first,then integer, long and others"
  )

}

class MyPriorityActorMailBoxTyped(settings: Settings, config: Config)
    extends UnboundedPriorityMailbox(
      // Create a new PriorityGenerator,lower prio means more important
      PriorityGenerator {
        // Int Messages
        case x: IntMessage => 1
        // String Messages
        case x: StringMessage => 0
        // Long messages
        case x: LongMessage => 2
        // other messages
        case x: DoubleMessage => 3
        case _                => 4
      }
    )

trait MyMessage
case class IntMessage(x: Int) extends MyMessage
case class LongMessage(x: Long) extends MyMessage
case class StringMessage(x: String) extends MyMessage
case class DoubleMessage(x: Double) extends MyMessage

object MyPriorityActorTyped {

  def receive: Behavior[MyMessage] = Behaviors.receive { (context, message) =>
    message match {
      case IntMessage(x)    => println(x)
      case StringMessage(x) => println(x)
      case LongMessage(x)   => println(x)
      case DoubleMessage(x) => println(x)
      case _                => println()
    }
    Behaviors.same
  }
}

Configuration File

prio-dispatcher {
    mailbox-type = "com.akka.prac.MyPriorityActorMailBoxTyped"
}

The same example works with Akka Classic. What am I missing here?

Thanks.

1
I suppose it happens because messages handled too fast, so ActorSystem has no change to prioritize messages. Try adding sleep to receive.Ivan Kurchenko
Thanks @IvanKurchenko. I tried by adding 1 sec sleep to receive but got the same result. \n Updated Actor: 'object MyPriorityActorTyped { def receive: Behavior[MyMessage] = Behaviors.receive{ (context,message) => Thread.sleep(1000) message match { case IntMessage(x) => println(x) case StringMessage(x) => println(x) case LongMessage(x) => println(x) case DoubleMessage(x) => println(x) case _ => println() } Behaviors.same } } ` Output: 6.0 1 5.0 3 Hello 5 I am priority actor I process string messages...mudassir
Did you use the MailboxSelector when creating the actor? Seems to be the new Typed way: doc.akka.io/docs/akka/current/typed/mailboxes.htmlFlorian Schaetz
Thanks @FlorianSchaetz. I changed the DispatcherSelector in my code to MailboxSelector. Same result unfortunately!mudassir

1 Answers

0
votes

This seems to have been a bug, which now has been fixed in 2.6.14.

Justed tested it and it works with the config from the documentation:

ActorSystem.create(Behaviors.setup(ctx -> someStaticBehavior(ctx)), "system", ConfigFactory.load("application.conf"), MailboxSelector.fromConfig("my-app.my-special-mailbox"));