38
votes

I am trying to use Akka HTTP to basic authenticate my request. It so happens that I have an external resource to authenticate through, so I have to make a rest call to this resource.

This takes some time, and while it's processing, it seems the rest of my API is blocked, waiting for this call. I have reproduced this with a very simple example:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

If I post to the log endpoint, my get endpoint is also stuck waiting for the 5 seconds, which the log endpoint dictated.

Is this expected behaviour, and if is, how do I make blocking operations without blocking my entire API?

2

2 Answers

133
votes

What you observe is expected behaviour – yet of course it's very bad. Good that known solutions and best practices exist to guard against it. In this answer I'd like to spend some time to explain the issue short, long, and then in depth – enjoy the read!

Short answer: "don't block the routing infrastructure!", always use a dedicated dispatcher for blocking operations!

Cause of the observed symptom: The problem is that you're using context.dispatcher as the dispatcher the blocking futures execute on. The same dispatcher (which is in simple terms just a "bunch of threads") is used by the routing infrastructure to actually handle the incoming requests – so if you block all available threads, you end up starving the routing infrastructure. (A thing up for debate and benchmarking is if Akka HTTP could protect from this, I'll add that to my research todo-list).

Blocking must be treated with special care to not impact other users of the same dispatcher (which is why we make it so simple to separate execution onto different ones), as explained in the Akka docs section: Blocking needs careful management.

Something else I wanted to bring to attention here is that one should avoid blocking APIs at all if possible - if your long running operation is not really one operation, but a series thereof, you could have separated those onto different actors, or sequenced futures. Anyway, just wanted to point out – if possible, avoid such blocking calls, yet if you have to – then the following explains how to properly deal with those.

In-depth analysis and solutions:

Now that we know what is wrong, conceptually, let's have a look what exactly is broken in the above code, and how the right solution to this problem looks like:

Colour = thread state:

  • turquoise – SLEEPING
  • orange - WAITING
  • green - RUNNABLE

Now let's investigate 3 pieces of code and how the impact the dispatchers, and performance of the app. To force this behaviour the app has been put under the following load:

  • [a] keep requesting GET requests (see above code in initial question for that), it's not blocking there
  • [b] then after a while fire 2000 POST requests, which will cause the 5second blocking before returning the future

1) [bad] Dispatcher behaviour on bad code:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

So we expose our app to [a] load, and you can see a number of akka.actor.default-dispatcher threads already - they're handling the requests – small green snippet, and orange meaning the others are actually idle there.

blocking is killing the default dispatcher

Then we start the [b] load, which causes blocking of these threads – you can see an early thread "default-dispatcher-2,3,4" going into blocking after being idle before. We also observe that the pool grows – new threads are started "default-dispatcher-18,19,20,21..." however they go into sleeping immediately (!) – we're wasting precious resource here!

The number of the such started threads depends on the default dispatcher configuration, but likely will not exceed 50 or so. Since we just fired 2k blocking ops, we starve the entire threadpool – the blocking operations dominate such that the routing infra has no thread available to handle the other requests – very bad!

Let's do something about it (which is an Akka best practice btw – always isolate blocking behaviour like shown below):

2) [good!] Dispatcher behaviour good structured code/dispatchers:

In your application.conf configure this dispatcher dedicated for blocking behaviour:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

You should read more in the Akka Dispatchers documentation, to understand the various options here. The main point though is that we picked a ThreadPoolExecutor which has a hard limit of threads it keeps available for the blocking ops. The size settings depend on what your app does, and how many cores your server has.

Next we need to use it, instead of the default one:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

We pressure the app using the same load, first a bit of normal requests and then we add the blocking ones. This is how the ThreadPools will behave in this case:

the blocking pool scales to our needs

So initially the normal requests are easily handled by the default dispatcher, you can see a few green lines there - that's actual execution (I'm not really putting the server under heavy load, so it's mostly idle).

Now when we start issuing the blocking ops, the my-blocking-dispatcher-* kicks in, and starts up to the number of configured threads. It handles all the Sleeping in there. Also, after a certain period of nothing happening on those threads, it shuts them down. If we were to hit the server with another bunch of blocking the pool would start new threads that will take care of the sleep()-ing them, but in the meantime – we're not wasting our precious threads on "just stay there and do nothing".

When using this setup, the throughput of the normal GET requests was not impacted, they were still happily served on the (still pretty free) default dispatcher.

This is the recommended way of dealing with any kind of blocking in reactive applications. It often is referred to as "bulkheading" (or "isolating") the bad behaving parts of an app, in this case the bad behaviour is sleeping/blocking.

3) [workaround-ish] Dispatcher behaviour when blocking applied properly:

In this example we use the scaladoc for scala.concurrent.blocking method which can help when faced with blocking ops. It generally causes more threads to be spun up to survive the blocking operations.

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

The app will behave like this:

blocking causes more threads to be started

You'll notice that A LOT of new threads are created, this is because blocking hints at "oh, this'll be blocking, so we need more threads". This causes the total time we're blocked to be smaller than in the 1) example, however then we have hundreds of threads doing nothing after the blocking ops have finished... Sure, they will eventually be shut down (the FJP does this), but for a while we'll have a large (uncontrolled) amount of threads running, in contrast to the 2) solution, where we know exactly how many threads we're dedicating for the blocking behaviours.

Summing up: Never block the default dispatcher :-)

The best practice is to use the pattern shown in 2), to have a dispatcher for the blocking operations available, and execute them there.

Hope this helps, happy hakking!

Discussed Akka HTTP version: 2.0.1

Profiler used: Many people have asked me in response to this answer privately what profiler I used to visualise the Thread states in the above pics, so adding this info here: I used YourKit which is an awesome commercial profiler (free for OSS), though you can achieve the same results using the free VisualVM from OpenJDK.

3
votes

Strange, but for me everything works fine (no blocking). Here is code:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future


object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

Also you can wrap you async code into onComplete or onSuccess directive:

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}