0
votes

The following code example (which you can copy and run) shows a MyParentActor that creates a MyChildActor.

The MyChildActor throws an exception for its first message which causes it to be restarted.

However, what I want to achieve is for "Message 1" to still be processed before "Message 2" on restart of the MyChildActor.

Instead, what is happening is that Message 1 is added to the tail of the mailbox queue, and so Message 2 is processed first.

How do I achieve ordering of the original messages on restart of an actor, without having to create my own mailbox etc?

object TestApp extends App {
  var count = 0
  val actorSystem = ActorSystem()


  val parentActor =  actorSystem.actorOf(Props(classOf[MyParentActor]))
  parentActor ! "Message 1"
  parentActor ! "Message 2"

  class MyParentActor extends Actor with ActorLogging{
    var childActor: ActorRef = null

    @throws[Exception](classOf[Exception])
    override def preStart(): Unit = {
      childActor = context.actorOf(Props(classOf[MyChildActor]))
    }

    override def receive = {
      case message: Any  => {
        childActor ! message
      }
    }

    override def supervisorStrategy: SupervisorStrategy = {
      OneForOneStrategy() {
          case _: CustomException  => Restart
          case _: Exception         => Restart
        }
    }
  }

  class MyChildActor extends Actor with ActorLogging{


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
      message match {
        case Some(e) => self ! e
      }
    }

    override def receive = {
      case message: String  => {
        if (count == 0) {
          count += 1
          throw new CustomException("Exception occurred")
        }
        log.info("Received message {}", message)
      }
    }
  }

  class CustomException(message: String) extends RuntimeException(message)
}
1

1 Answers

1
votes

You could mark the failing message with a special envelope and stash everything up to the receiving of that message (see child actor implementation). Just define a behaviour where the actor stashes every message except for the specific envelope, processes it's payload and then unstashes all other messages and returns to it's normal behaviour.

This gives me:

INFO TestApp$MyChildActor - Received message Message 1
INFO TestApp$MyChildActor - Received message Message 2

object TestApp extends App { 
  var count = 0
  val actorSystem = ActorSystem()


  val parentActor =    actorSystem.actorOf(Props(classOf[MyParentActor]))
  parentActor ! "Message 1"
  parentActor ! "Message 2"

  class MyParentActor extends Actor with ActorLogging{
    var childActor: ActorRef = null

    @throws[Exception](classOf[Exception])
    override def preStart(): Unit = {
        childActor = context.actorOf(Props(classOf[MyChildActor]))
    }

    override def receive = {
        case message: Any => {
            childActor ! message
        }
    }

    override def supervisorStrategy: SupervisorStrategy = {
        OneForOneStrategy() {
            case e: CustomException => Restart
            case _: Exception => Restart
        }
    }
  }

  class MyChildActor extends Actor with Stash with ActorLogging{


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        message match {
            case Some(e) =>
                self ! Unstash(e)
        }
    }

    override def postRestart(reason: Throwable): Unit = {
        context.become(stashing)
        preStart()
    }

    override def receive = {
        case message: String => {
            if (count == 0) {
                count += 1
                throw new CustomException("Exception occurred")
            }
            log.info("Received message {}", message)
        }
    }

    private def stashing: Receive = {
        case Unstash( payload ) =>
            receive(payload)
            unstashAll()
            context.unbecome()
        case m =>
            stash()
    }
  }

  case class Unstash( payload: Any )
  class CustomException(message: String) extends RuntimeException(message)
}