4
votes

From this tutorial https://github.com/slouc/concurrency-in-scala-with-ce#threading async operations are divided into 3 groups and require significantly different thread pools to run on:

Non-blocking asynchronous operations:

Bounded pool with a very low number of threads (maybe even just one), with a very high priority. These threads will basically just sit idle most of the time and keep polling whether there is a new async IO notification. Time that these threads spend processing a request directly maps into application latency, so it's very important that no other work gets done in this pool apart from receiving notifications and forwarding them to the rest of the application. Bounded pool with a very low number of threads (maybe even just one), with a very high priority. These threads will basically just sit idle most of the time and keep polling whether there is a new async IO notification. Time that these threads spend processing a request directly maps into application latency, so it's very important that no other work gets done in this pool apart from receiving notifications and forwarding them to the rest of the application.

Blocking asynchronous operations:

Unbounded cached pool. Unbounded because blocking operation can (and will) block a thread for some time, and we want to be able to serve other I/O requests in the meantime. Cached because we could run out of memory by creating too many threads, so it’s important to reuse existing threads.

CPU-heavy operations:

Fixed pool in which number of threads equals the number of CPU cores. This is pretty straightforward. Back in the day the "golden rule" was number of threads = number of CPU cores + 1, but "+1" was coming from the fact that one extra thread was always reserved for I/O (as explained above, we now have separate pools for that).

In my Cats Effect application, I use Scala Future-based ReactiveMongo lib to access MongoDB, which does NOT block threads when talking with MongoDB, e.g. performs non-blocking IO.

It needs execution context. Cats effect provides default execution context IOApp.executionContext

My question is: which execution context should I use for non-blocking io?

IOApp.executionContext?

But, from IOApp.executionContext documemntation:

Provides a default ExecutionContext for the app.

The default on top of the JVM is lazily constructed as a fixed thread pool based on the number available of available CPUs (see PoolUtils).

Seems like this execution context falls into 3rd group I listed above - CPU-heavy operations (Fixed pool in which number of threads equals the number of CPU cores.), and it makes me think that IOApp.executionContext is not a good candidate for non-blocking IO.

Am I right and should I create a separate context with a fixed thread pool (1 or 2 threads) for non-blocking IO (so it will fall into the first group I listed above - Non-blocking asynchronous operations: Bounded pool with a very low number of threads (maybe even just one), with a very high priority.)?

Or is IOApp.executionContext designed for both CPU-bound and Non-Blocking IO operations?

The function I use to convert Scala Future to F and excepts execution context:

def scalaFutureToF[F[_]: Async, A](
      future: => Future[A]
  )(implicit ec: ExecutionContext): F[A] =
    Async[F].async { cb =>
      future.onComplete {
        case Success(value)     => cb(Right(value))
        case Failure(exception) => cb(Left(exception))
      }
    }
1

1 Answers

3
votes

In Cats Effect 3, each IOApp has a Runtime:

final class IORuntime private[effect] (
  val compute: ExecutionContext,
  private[effect] val blocking: ExecutionContext,
  val scheduler: Scheduler,
  val shutdown: () => Unit,
  val config: IORuntimeConfig,
  private[effect] val fiberErrorCbs: FiberErrorHashtable = new FiberErrorHashtable(16)
)

You will almost always want to keep the default values and not fiddle around declaring your own runtime, except in perhaps tests or educational examples.

Inside your IOApp you can then access the compute pool via:

runtime.compute

If you want to execute a blocking operation, then you can use the blocking construct:

blocking(IO(println("foo!"))) >> IO.unit

This way, you're telling the CE3 runtime that this operation could be blocking and hence should be dispatched to a dedicated pool. See here.

What about CE2? Well, it had similar mechanisms but they were very clunky and also contained quite a few surprises. Blocking calls, for example, were scheduled using Blocker which then had to be somehow summoned out of thin air or threaded through the whole app, and thread pool definitions were done using the awkward ContextShift. If you have any choice in the matter, I highly recommend investing some effort into migrating to CE3.

Fine, but what about Reactive Mongo?

ReactiveMongo uses Netty (which is based on Java NIO API). And Netty has its own thread pool. This is changed in Netty 5 (see here), but ReactiveMongo seems to still be on Netty 4 (see here).

However, the ExecutionContext you're asking about is the thread pool that will perform the callback. This can be your compute pool.

Let's see some code. First, your translation method. I just changed async to async_ because I'm using CE3, and I added the thread printline:

def scalaFutureToF[F[_]: Async, A](future: => Future[A])(implicit ec: ExecutionContext): F[A] =
  Async[F].async_ { cb =>
    future.onComplete {
      case Success(value)     => {
        println(s"Inside Callback: [${Thread.currentThread.getName}]")
        cb(Right(value))
      }
      case Failure(exception) => cb(Left(exception))
    }
  }

Now let's pretend we have two execution contexts - one from our IOApp and another one that's going to represent whatever ReactiveMongo uses to run the Future. This is the made-up ReactiveMongo one:

val reactiveMongoContext: ExecutionContext =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

and the other one is simply runtime.compute.

Now let's define the Future like this:

def myFuture: Future[Unit] = Future {
  println(s"Inside Future: [${Thread.currentThread.getName}]")
}(reactiveMongoContext)

Note how we are pretending that this Future runs inside ReactiveMongo by passing the reactiveMongoContext to it.

Finally, let's run the app:

override def run: IO[Unit] = {
  val myContext: ExecutionContext = runtime.compute
  scalaFutureToF(myFuture)(implicitly[Async[IO]], myContext)
}

Here's the output:

Inside Future: [pool-1-thread-1]
Inside Callback: [io-compute-6]

The execution context we provided to scalaFutureToF merely ran the callback. Future itself ran on our separate thread pool that represents ReactiveMongo's pool. In reality, you will have no control over this pool, as it's coming from within ReactiveMongo.

Extra info

By the way, if you're not working with the type class hierarchy (F), but with IO values directly, then you could use this simplified method:

def scalaFutureToIo[A](future: => Future[A]): IO[A] =
  IO.fromFuture(IO(future))

See how this one doesn't even require you to pass an ExecutionContext - it simply uses your compute pool. Or more specifically, it uses whatever is defined as def executionContext: F[ExecutionContext] for the Async[IO], which turns out to be the compute pool. Let's check:

override def run: IO[Unit] = {
  IO.executionContext.map(ec => println(ec == runtime.compute))
}
// prints true

Last, but not least:

If we really had a way of specifying which thread pool ReactiveMongo's underlying Netty should be using, then yes, in that case we should definitely use a separate pool. We should never be providing our runtime.compute pool to other runtimes.