0
votes

I have two methods, let's call them load() and init(). Each one starts a computation in its own thread and returns a Future on its own execution context. The two computations are independent.

val loadContext = ExecutionContext.fromExecutor(...)
def load(): Future[Unit] = {
  Future
}

val initContext = ExecutionContext.fromExecutor(...)
def init(): Future[Unit] = {
  Future { ... }(initContext)
}

I want to call both of these from some third thread -- say it's from main() -- and perform some other computation when both are finished.

def onBothComplete(): Unit = ...

Now:

  1. I don't care which completes first
  2. I don't care what thread the other computation is performed on, except:
  3. I don't want to block either thread waiting for the other;
  4. I don't want to block the third (calling) thread; and
  5. I don't want to have to start a fourth thread just to set the flag.

If I use for-comprehensions, I get something like:

val loading = load()
val initialization = initialize()

for {
  loaded <- loading
  initialized <- initialization
} yield { onBothComplete() }

and I get Cannot find an implicit ExecutionContext.

I take this to mean Scala wants a fourth thread to wait for the completion of both futures and set the flag, either an explicit new ExecutionContext or ExecutionContext.Implicits.global. So it would appear that for-comprehensions are out.

I thought I might be able to nest callbacks:

initialization.onComplete {
  case Success(_) =>
    loading.onComplete {
      case Success(_) => onBothComplete()
      case Failure(t) => log.error("Unable to load", t)
    }
  case Failure(t) => log.error("Unable to initialize", t)
}

Unfortunately onComplete also takes an implicit ExecutionContext, and I get the same error. (Also this is ugly, and loses the error message from loading if initialization fails.)

Is there any way to compose Scala Futures without blocking and without introducing another ExecutionContext? If not, I might have to just throw them over for Java 8 CompletableFutures or Javaslang Vavr Futures, both of which have the ability to run callbacks on the thread that did the original work.

Updated to clarify that blocking either thread waiting for the other is also not acceptable.

Updated again to be less specific about the post-completion computation.

2
You should do Future.firstCompletedOf(List(initialization, loading)) if you want to grab the result of the Future that completes first. The for-list-comprehension will wait for both futures to complete before performing the yield.pcting
@pcting I do want to wait for both futures to complete before I perform some other computation. I don't want to block the current thread to do it.David Moles
If this is for the exact purpose of setting an AtomicBoolean to true, then you can instead have a Promise which when is completed indicates completion. promise.completeWith(initialization zip loading)Viktor Klang
@ViktorKlang It's not; that was just an example. I'll edit the question.David Moles

2 Answers

1
votes

Why not just reuse one of your own execution contexts? Not sure what your requirements for those are but if you use a single thread executor you could just reuse that one as the execution context for your comprehension and you won't get any new threads created:

implicit val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)

If you really can't reuse them you may consider this as the implicit execution context:

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
  (runnable: Runnable) => {
    runnable.run()
  })

Which will run futures on the current thread. However, the Scala docs explicitly recommends against this as it introduces nondeterminism in which thread runs the Future (but as you stated, you don't care which thread it runs on so this may not matter).

See Synchronous Execution Context for why this isn't advisable.

An example with that context:

val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)

def load(): Future[Unit] = {
  Future(println("loading thread " + Thread.currentThread().getName))(loadContext)
}

val initContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)

def init(): Future[Unit] = {
  Future(println("init thread " + Thread.currentThread().getName))(initContext)
}

val doneFlag = new AtomicBoolean(false)

val loading = load()
val initialization = init()

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
  (runnable: Runnable) => {
    runnable.run()
  })

for {
  loaded <- loading
  initialized <- initialization
} yield {
  println("yield thread " + Thread.currentThread().getName)
  doneFlag.set(true)
}

prints:

loading thread pool-1-thread-1
init thread pool-2-thread-1
yield thread main

Though the yield line may print either pool-1-thread-1 or pool-2-thread-1 depending on the run.

1
votes

In Scala, a Future represents a piece of work to be executed async (i.e. concurrently to other units of work). An ExecutionContext represent a pool of threads for executing Futures. In other words, ExecutionContext is the team of worker who performs the actual work.

For efficiency and scalability, it's better to have big team(s) (e.g. single ExecutionContext with 10 threads to execute 10 Future's) rather than small teams (e.g. 5 ExecutionContext with 2 threads each to execute 10 Future's).

In your case if you want to limit the number of threads to 2, you can:

def load()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = {
  Future { ... } /* will use the teamOfWorkers implicitly */
}

def init()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = {
  Future { ... } /* will use the teamOfWorkers implicitly */
}

implicit val bigTeamOfWorkers = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
/* All async works in the following will use 
   the same bigTeamOfWorkers implicitly and works will be shared by
   the 2 workers (i.e. thread) in the team  */
for {
  loaded <- loading
  initialized <- initialization
} yield doneFlag.set(true)

The Cannot find an implicit ExecutionContext error does not mean that Scala wants additional threads. It only means that Scala wants a ExecutionContext to do the work. And additional ExecutionContext does not necessarily implies additional 'thread', e.g. the following ExecutionContext, instead of creating new threads, will execute works in the current thread:

val currThreadExecutor = ExecutionContext.fromExecutor(new Executor {
  override def execute(command: Runnable): Unit = command.run()
})