I am using BackoffSupervisor strategy to create a child actor that has to process some message. I want to implement a very simple restart strategy, in which in case of exception:
- Child propagates failing message to supervisor
Supervisor restarts child and sends the failing message again.
Supervisor gives up after 3 retries
- Akka persistence is not an option
So far what I have is this:
Supervisor definition:
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
Child actor that is supposed to send the e-mail, but fails (throws some Exception) and propagates Exception back to supervisor:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
In the above code I wrap any exception into custom class MessageException that gets propagated to SupervisorStrategy, but how to propagate it further to the new child to force reprocessing? Is this the right approach?
Edit. I attempted to resent the message to the Actor on preRestart
hook, but somehow the hook is not being triggered:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
This gives me something similar to following output:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
But no logs from preRestart
hook
Backoff.onFailure
, when the child of theBackoffSupervisor
restarts, the child'spreRestart
method is not called, because the underlying supervisor actually stops the child, then starts it again later. – Jeffrey Chung