8
votes

I have an actor pool of 100 running Actors which share a work stealing dispatcher with its CorePoolSize set to 100. But now when sending 19 messages to one of the Actors the 19 messages aren't parallelized to 19 Actors, there are only 5 messages running in parallel. When these 5 messages are finished, the next 5 messages are processed by these same 5 Actors again and so on. Why aren't my 19 messages running in parallel, what am i missing here?

My code looks basically like this:

object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  println("Actor: " + name + " Received: " + num)
                               Thread.sleep(10000)
                            }
    }
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = self.shutdownLinkedActors()
}

val supervisor = actorOf(new TestActorManager with CyclicLoadBalancing {
val testActors = (1 until 100 toList) map (i => actorOf(new TestActor(i)))   
}).start

println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

(1 until 20 toList) foreach { testActor ! _ }

The Output:

Actor: 4 Received: 16
Actor: 3 Received: 17
Actor: 1 Received: 19
Actor: 59 Received: 1
Actor: 2 Received: 18

// 10 secs. are passing..

Actor: 4 Received: 15
Actor: 3 Received: 14
Actor: 1 Received: 13
Actor: 59 Received: 2
Actor: 2 Received: 12

// 10 secs. are passing..

Actor: 4 Received: 11
Actor: 3 Received: 10
Actor: 59 Received: 3
Actor: 2 Received: 8
Actor: 1 Received: 9

// 10 secs. are passing..

Actor: 4 Received: 7
Actor: 3 Received: 6
Actor: 59 Received: 4
Actor: 2 Received: 5

edit: i'm using Akka 1.0

2
How many threads are you backing it with?Viktor Klang
Where can i find that? I'm basically using this config.file: scalablesolutions.se/akka/docs/akka-0.10/configuration.html I tried to modify the core-pool-size-factor configuration, but that didn't change anything. But i think that is for the default dispatcher..rocksteady

2 Answers

12
votes

thanks for your inquiry, I localized the bottleneck and fixed it in this commit to Akka master:

https://github.com/akka/akka/commit/e4e99ef56399e892206ce4a46b9a9107da6c7770

It will be released in Akka 1.1-RC1

Cheers, √

2
votes

I think the dispatcher allows you to customize throughput property. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep. You could add the following configuration in your akka.conf file

actor {
  throughput = 20
}

By default it is 5