4
votes

I am using BackoffSupervisor strategy to create a child actor that has to process some message. I want to implement a very simple restart strategy, in which in case of exception:

  1. Child propagates failing message to supervisor
  2. Supervisor restarts child and sends the failing message again.

  3. Supervisor gives up after 3 retries

  4. Akka persistence is not an option

So far what I have is this:

Supervisor definition:

val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = cmd.hashCode.toString,
    minBackoff = 1.seconds,
    maxBackoff = 2.seconds,
    randomFactor = 0.2 
  )
    .withSupervisorStrategy(
      OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
        case msg: MessageException => {
          println("caught specific message!")
          SupervisorStrategy.Restart
        }
        case _: Exception => SupervisorStrategy.Restart
        case _              ⇒ SupervisorStrategy.Escalate
      })
)

val sup = context.actorOf(supervisor)


sup ! cmd

Child actor that is supposed to send the e-mail, but fails (throws some Exception) and propagates Exception back to supervisor:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    throw new Exception("surprising exception")
  } 

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

In the above code I wrap any exception into custom class MessageException that gets propagated to SupervisorStrategy, but how to propagate it further to the new child to force reprocessing? Is this the right approach?

Edit. I attempted to resent the message to the Actor on preRestart hook, but somehow the hook is not being triggered:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    //    println("mail sent!")
    throw new Exception("surprising exception")
  }

  override def preStart(): Unit = {
    println("child starting")
  }


  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    reason match {
      case m: MessageException => {
        println("aaaaa")
        message.foreach(self ! _)
      }
      case _ => println("bbbb")
    }
  }

  override def postStop(): Unit = {
    println("child stopping")
  }

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

This gives me something similar to following output:

new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting

But no logs from preRestart hook

4
With Backoff.onFailure, when the child of the BackoffSupervisor restarts, the child's preRestart method is not called, because the underlying supervisor actually stops the child, then starts it again later.Jeffrey Chung
Exactly the issue. Is there any way around this and still be able to reuse backoff supervisor?TheMP

4 Answers

6
votes

The reason that the child's preRestart hook is not invoked is because Backoff.onFailure uses BackoffOnRestartSupervisor underneath the covers, which replaces the default restart behavior with a stop-and-delayed-start behavior that is consistent with the backoff policy. In other words, when using Backoff.onFailure, when a child is restarted, the child's preRestart method is not called because the underlying supervisor actually stops the child, then starts it again later. (Using Backoff.onStop can trigger the child's preRestart hook, but that's tangential to the present discussion.)

The BackoffSupervisor API doesn't support the automatic resending of a message when the supervisor's child restarts: you have to implement this behavior yourself. An idea for retrying messages is to let the BackoffSupervisor's supervisor handle it. For example:

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    ...
  ).withReplyWhileStopped(ChildIsStopped)
  ).withSupervisorStrategy(
    OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
      case msg: MessageException =>
        println("caught specific message!")
        self ! Error(msg.cmd) // replace cmd with whatever the property name is
        SupervisorStrategy.Restart
      case ...
    })
)

val sup = context.actorOf(supervisor)

def receive = {
  case cmd: NewMail =>
    sup ! cmd
  case Error(cmd) =>
    timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
    // We assume that NewMail has an id field. Also, adjust the time as needed.
  case Replay(cmd) =>
    sup ! cmd
  case ChildIsStopped =>
    println("child is stopped")
}

In the above code, the NewMail message embedded in the MessageException is wrapped in a custom case class (in order to easily distinguish it from a "normal"/new NewMail message) and sent to self. In this context, self is the actor that created the BackoffSupervisor. This enclosing actor then uses a single timer to replay the original message at some point. This point in time should be far enough in the future such that the BackoffSupervisor can potentially exhaust SenderActor's restart attempts, so that the child can have ample opportunity to get in a "good" state before it receives the resent message. Obviously this example involves only one message resend regardless of the number of child restarts.


Another idea is to create a BackoffSupervisor-SenderActor pair for every NewMail message, and have the SenderActor send the NewMail message to itself in the preStart hook. One concern with this approach is the cleaning up of resources; i.e., shutting down the BackoffSupervisors (which will, in turn, shut down their respective SenderActor children) when the processing is successful or when the child restarts are exhausted. A map of NewMail ids to (ActorRef, Int) tuples (in which the ActorRef is a reference to a BackoffSupervisor actor, and the Int is the number of restart attempts) would be helpful in this case:

class Overlord extends Actor {

  var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long

  def receive = {
    case cmd: NewMail =>
      val childProps = Props(new SenderActor(cmd, self))
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          ...
        ).withSupervisorStrategy(
          OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
            case msg: MessageException =>
              println("caught specific message!")
              self ! Error(msg.cmd)
              SupervisorStrategy.Restart
            case ...
          })
      )
      val sup = context.actorOf(supervisor)
      state += (cmd.id -> (sup, 0))

    case ProcessingDone(cmdId) =>
      state.get(cmdId) match {
        case Some((backoffSup, _)) =>
          context.stop(backoffSup)
          state -= cmdId
        case None =>
          println(s"${cmdId} not found")
      }

    case Error(cmd) =>
       val cmdId = cmd.id
       state.get(cmdId) match {
         case Some((backoffSup, numRetries)) =>
           if (numRetries == 3) {
             println(s"${cmdId} has already been retried 3 times. Giving up.")
             context.stop(backoffSup)
             state -= cmdId
           } else
             state += (cmdId -> (backoffSup, numRetries + 1))
         case None =>
           println(s"${cmdId} not found")
       }

    case ...
  }
}

Note that SenderActor in the above example takes a NewMail and an ActorRef as constructor arguments. The latter argument allows the SenderActor to send a custom ProcessingDone message to the enclosing actor:

class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
  override def preStart(): Unit = {
    println(s"child starting, sending ${cmd} to self")
    self ! cmd
  }

  def fakeSendMail(): Unit = ...

  def receive = {
    case cmd: NewMail => ...
  }
}

Obviously the SenderActor is set up to fail every time with the current implementation of fakeSendMail. I'll leave the additional changes needed in SenderActor to implement the happy path, in which SenderActor sends a ProcessingDone message to target, to you.

1
votes

In the good solution that @chunjef provides, he alert about the risk of schedule a job resend before the backoff supervisor has started the worker

This enclosing actor then uses a single timer to replay the original message at some point. This point in time should be far enough in the future such that the BackoffSupervisor can potentially exhaust SenderActor's restart attempts, so that the child can have ample opportunity to get in a "good" state before it receives the resent message.

If this happens, the scenario will be jobs going to dead letters and no further progress will be done. I've made a simplified fiddle with this scenario.

So, the schedule delay should be larger than the maxBackoff, and this could represent an impact in job completion time. A possible solution to avoid this scenario is making the worker actor to send a message to his father when is ready to work, like here.

0
votes

The failed child actor is available as the sender in your supervisor strategy. Quoting https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy:

If the strategy is declared inside the supervising actor (as opposed to within a companion object) its decider has access to all internal state of the actor in a thread-safe fashion, including obtaining a reference to the currently failed child (available as the sender of the failure message).

-1
votes

Sending emails is a dangerous operation with some third party software in your case. Why not to apply Circuit Breaker pattern and skip the sender actor entirely? Also, you can still have an actor (with some Backoff Supervisor) and Circuit Breaker inside it (if that makes sense for you).