8
votes

I'm attempting to get fault tolerant behavior in akka Actors. I am working on some code that depends on the Actors in the system being available for a long run of processing. I am finding that my processing stops after a couple of hours (it should take around 10 hours ) and not much is happening. I believe that my Actors are not recovering from exceptions.

What do I need to do to get Actors be restarted on a one for one basis permanently? I expect that this can be done from this documentation http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

I'm working with akka 1.1.3 and scala 2.9

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
           throw new Exception("This is a simulated failure")
         println("Actor: " + name + " Received: " + num)
         //Thread.sleep(100)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  //callback method for restart handling 
  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  //callback method for restart handling 
  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = { System.out.println("postStop") }
}


  object FaultTest {
    def main(args : Array[String]) : Unit = {
      println("starting FaultTest.main()")
      val numOfActors = 5
      val supervisor = actorOf(
        new TestActorManager with CyclicLoadBalancing {
             val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
        }
      )

      supervisor.start();

      println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

      val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

      (1 until 200 toList) foreach { testActor ! _ }

    }
  }

This code sets up 5 Actors behind a LoadBalancer that just print out Integers that are sent to them except that they throw Exceptions on even numbers to simulate faults. The Integers 0 through 200 are sent to these Actors. I expect that the odd numbers will get output but everything seems to shut down after a couple faults on even numbers. Running this code with sbt results in this output:

[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

What I think is happening here is that 5 actors start, and the first 5 even numbers put them out of business and they are not getting restarted.

How can this code be changed so that the Actors recover from exceptions?

I expect that this would actually print out all the odd numbers from 1 to 200. I think that each actor would fail on even numbers but be restarted with an intacted mailbox on exceptions. I expect to see the println from preRestart and postRestart. What needs to be configured in this code sample to get these things to happen?

Here are some additional assumptions about akka and Actors that may be leading to my misunderstanding. I'm assuming that an Actor can be configured with a Supervisor or a faultHandler so that it will be restarted and continue to be available when an exception is thrown during receive. I'm assuming that the message that was sent to the actor will be lost if it throws an exception during receive. I'm assuming that the preRestart() and postRestart() on the actor that throws the exception will be called.

The code example represents what I'm trying to do and is based on Why is my Dispatching on Actors scaled down in Akka?

** Another code sample **

Here is another code sample that is more simple. I am starting one actor that throws exceptions on even numbers. There is no load balancer or other stuff in the way. I'm attempting to print out information about the actor. I'm waiting to exit the program for a minute after the messages have been sent to the Actor and monitoring what is happening.

I expect that this would print out the odd numbers but it looks like the Actor sits around with messages in its mailbox.

Do I have the OneForOneStrategy set wrong? Do I need to link the Actor to something? Is this sort of configuration fundamentally misdirected on my part? Does a Dispatcher need to be setup with fault tolerance some how? Could I be messing up the threads in the Dispatcher?

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
            throw new Exception("This is a simulated failure, where does this get logged?")
         println("Actor: " + name + " Received: " + num)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

object TestSingleActor{

    def main(args : Array[String]) : Unit = {
      println("starting TestSingleActor.main()")

      val testActor = Actor.actorOf( new SingleActor(1) ).start()

      println("number of actors: " + registry.actors.size)
      printAllActorsInfo

      (1 until 20 toList) foreach { testActor ! _ }

      for( i <- 1 until 120 ){
        Thread.sleep(500)
        printAllActorsInfo
      }
    }

  def printAllActorsInfo() ={
    registry.actors.foreach( (a) =>
       println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
               .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
  }
}

I'm getting output like:

[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM
2

2 Answers

5
votes

The problem was that I was with my akka.conf file. I was using the reference 1.1.3 akka.conf file except for the line that configured the event-handlers.

mine (the broken one):

    event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

reference 1.1.3 (the one that works):

    event-handlers = ["akka.event.EventHandler$DefaultListener"]

With my event-handlers config line, Actor restarts do not happen. With the reference 1.1.3 line restarts happen wonderfully.

I made this change based on these instructions http://akka.io/docs/akka/1.1.3/general/slf4j.html

So, by getting rid of the suggestions in that page and going back to the 1.1.3 reference akka.conf I was able to get fault tolerant Actors.

1
votes

I believe your problem terminates after the messages are sent, you aren't trying to keep your asynchronous application alive, and so the main thread exits, and takes everything down with it.