I read in the Akka docs that it's dangerous to close over variables from an enclosing actor.
Warning
In this case you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods on the enclosing actor from within the anonymous Actor class. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the other actor’s code will be scheduled concurrently to the enclosing actor.
Now, I have two actors, one of which requests something from the second and does something with the result. In this example below I have put together, actor Accumulator retrieves numbers from actor NumberGenerator and adds them up, reporting the sum along the way.
This can be done in at least two different ways as this example shows with two different receive functions (A vs B). The difference between the two is that A does not close over the counter variable; instead it awaits an integer and sums it up, while B creates a Future that closes over counter and does the sum. This happens within an anonymous actor created just to process onSuccess, if I understand properly how this works.
import com.esotericsoftware.minlog.Log
import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._
case object Start
case object Request
object ActorTest {
var wake = 0
val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")
Log.info("ActorTest", "Starting !")
accRef ! Start
}
class Accumulator extends Actor {
var counter = 0
implicit val timeout = Timeout(5 seconds)
// A: WITHOUT CLOSURE
def receive = {
case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
}
// B: WITH CLOSURE
def receive = {
case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
}
}
}
class NumberGenerator extends Actor {
val rand = new java.util.Random()
def receive = {
case Request => sender ! rand.nextInt(11)-5
}
}
Is it absolutely evil to use closures in this case ? Of course I could use an AtomicInteger instead of an Int, or in some networking scenario using, say, netty, issue a write operation on a threadsafe channel, but this is not my point here.
To the risk of asking the ridiculous: is there a way for the Future's onSuccess to execute in this actor instead of an anonymous middle actor, without defining a case in the receive function ?
EDIT
To put it more clearly, my question is: Is there a way to force a series of Futures to run in the same thread as a given Actor ?