0
votes

I am learning Akka Actor recently. I read the document of dispatchers in Actor. I am curious about the blocking operation in an actor. The last topic in the document describes how to solve the problem. And I am trying to reproduce the example experiment in the document.

Here is my code:

package dispatcher

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Main extends App{

  var config = ConfigFactory.parseString(
    """
      |my-dispatcher{
      |type = Dispatcher
      |
      |executor = "fork-join-executor"
      |
      |fork-join-executor{
      |fixed-pool-size = 32
      |}
      |throughput = 1
      |}
    """.stripMargin)

//  val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))


  val system = ActorSystem("block")


  val actor1 = system.actorOf(Props(new BlockingFutureActor()))
  val actor2 = system.actorOf(Props(new PrintActor()))

  for(i <- 1 to 1000){
    actor1 ! i
    actor2 ! i
  }

}

package dispatcher

import akka.actor.Actor

import scala.concurrent.{ExecutionContext, Future}

class BlockingFutureActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      Thread.sleep(5000)
      implicit val excutionContext: ExecutionContext = context.dispatcher
      Future {
        Thread.sleep(5000)
        println(s"Blocking future finished ${i}")
      }
  }
}
package dispatcher

import akka.actor.Actor

class PrintActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      println(s"PrintActor: ${i}")
  }
}

I simply create an ActorSystem with the default dispatchers and all actors depend on those. The BlockingFutureActor has a blocking operation that is encapsulated in a Future. The PrintActor is merely printing a number instantly.

In the document's explanation, the default dispatchers will be occupied by Futures in the BlockingFutureActor, which leads to the message blocking of the PrintActor. The application gets stuck somewhere like:

> PrintActor: 44
> PrintActor: 45

Unfortunately, my code is not blocked. All outputs from PrintActor show up smoothly. But outputs from BlockingFutureActor show up like squeezing toothpaste. I try to monitor my thread info by Intellij's Debug, I got: thread monitoring

You may find only two dispatchers are sleeping(BlockingFutureActor makes this happen). Others are waiting, which means they are available for new message delivering.

I have read an answer about blocking operation in Actor(page). It is quoted that "Dispatchers are, effectively, thread-pools. Separating the two guarantees that the slow, blocking operations don't starve the other. This approach, in general, is referred to as bulk-heading, because the idea is that if a part of the app fails, the rest remains responsive."

Do default dispatchers spare some dispatcher for blocking operation? Such that the system can handle messages even if there are so many blocking operations asking for dispatchers.

Can the experiment in the Akka document be reproduced? Is there something wrong with my configuration.

Thanks for your suggestions. Best Wishes.

1
"All outputs from PrintActor show up smoothly." Are you saying that you see all 1000 println statements from PrintActor?Jeffrey Chung
Yes, exactly. 1000 println show up at the moment that the application starts.jiexray

1 Answers

2
votes

The reason you see all 1000 print statements from the PrintActor before any print statements from the BlockingFutureActor is because of the first Thread.sleep call in the BlockingFutureActor's receive block. This Thread.sleep is the key difference between your code and the example in the official documentation:

override def receive: Receive = {
  case i: Int =>
    Thread.sleep(5000) // <----- this call is not in the example in the official docs
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

Remember that actors process one message at a time. The Thread.sleep(5000) basically simulates a message that takes at least five seconds to process. The BlockingFutureActor won't process another message until it's done processing the current message, even if it has hundreds of messages in its mailbox. While the BlockingFutureActor is processing that first Int message of value 1, the PrintActor has already finished processing all 1000 messages that were sent to it. To make this more clear, let's add a println statement:

override def receive: Receive = {
  case i: Int =>
    println(s"Entering BlockingFutureActor's receive: $i") // <-----
    Thread.sleep(5000)
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

A sample output when we run the program:

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...

As you can see, by the time the BlockingFutureActor actually begins to process the message 2, the PrintActor has already churned through all 1000 messages.

If you remove that first Thread.sleep, then you'll see messages dequeued from the BlockingFutureActor's mailbox more quickly, because the work is being "delegated" to a Future. Once the Future is created, the actor grabs the next message from its mailbox without waiting for the Future to complete. Below is a sample output without that first Thread.sleep (it won't be exactly the same every time you run it):

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...