3
votes

I wanted to control the number of threads in ExecutionContext. So I created a instance of ThreadPoolExecutor and then created ExecutionContext from it.

And I created some Futures and attached onSuccess callbacks on them. I expected each onSuccess callback was called when each Future work finished. But I found all onSuccess callbacks were executed at the same time.

import java.util.concurrent.{ Executors, ForkJoinPool }

import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration.Duration

object Main extends App {
  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
  // implicit val ec = ExecutionContext.fromExecutorService(new ForkJoinPool(2))

  val start = System.currentTimeMillis()

  val futures = for {
    i <- 1 to 10
  } yield Future[Int] {
    Thread.sleep(i * 1000)
    i
  }

  futures.foreach { f =>
    f.onSuccess { case i =>
      println(s"${i} Success. ${System.currentTimeMillis() - start}ms elapsed.")
    }
  }

  Await.ready(Future.sequence(futures.toList), Duration.Inf)
  ec.shutdown()
}

// ThreadPoolExecutor Result
// 1 Success. 25060ms elapsed.
// 2 Success. 25064ms elapsed.
// 3 Success. 25064ms elapsed.
// 4 Success. 25064ms elapsed.
// 5 Success. 25064ms elapsed.
// 6 Success. 25064ms elapsed.
// 7 Success. 25065ms elapsed.
// 8 Success. 25065ms elapsed.
// 9 Success. 25065ms elapsed.
// 10 Success. 30063ms elapsed.

// ForkJoinPool Result
// 1 Success. 1039ms elapsed.
// 2 Success. 2036ms elapsed.
// 3 Success. 4047ms elapsed.
// 4 Success. 6041ms elapsed.
// 5 Success. 12042ms elapsed.
// 6 Success. 12043ms elapsed.
// 7 Success. 25060ms elapsed.
// 8 Success. 25060ms elapsed.
// 9 Success. 25060ms elapsed.
// 10 Success. 30050ms elapsed.

The result above was printed at the same time not respectively. But when I use ForkJoinPool instead of ThreadPoolExecutor this problem is mitigated. Did I misuse ExecutionContext and Future?

edited: I found the problem happens when the number of threads is less than the number of futures. So I've edited above code to reproduce the problem and print the execution time.

I think future callback should be called on time even if the number of threads is small...

1
You need to post exact code you execute. What you've pasted is incomplete, and doesn't produce the output you describe. In fact, for me everything work as you expect, not as what you see.Haspemulator
I just edited the question. The problem happens when the number of threads is less than the futures...jyshin
Do you want/need to mark each Future as blocking, per this StackOverflow post?Castaglia
No. I want that onSuccess callback executed at the time when future is done.jyshin

1 Answers

0
votes

I eventually knew that Future callbacks(onComplete or onSuccess) are executed on the thread of the provided ExecutionContext. So If there are no idle threads in the pool, callback could not be executed. See scala.concurrent.Future

But still I don't understand behavior of ForkJoinPool. I need to study about that.