14
votes

I thought that using futures would easily allow me to to fire off one shot code blocks, however it seems I can only have 4 futures at a time.

Where does this restriction come from, or am I abusing Futures by using it like this?

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.Calendar

object Main extends App{

  val rand = scala.util.Random

  for (x <- 1 to 100) {
    val f = Future {
      //val sleepTime =  rand.nextInt(1000)
      val sleepTime =  2000
      Thread.sleep(sleepTime)

      val today = Calendar.getInstance().getTime()
      println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
      1;
    }
  }

  Thread.sleep(10000)
}

Output:

Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015

I expected them to all show the same time.

To give some context, I thought I could use this construct and extend it by having a main loop, in which it sleeps every loop according to a value drawn from a exponential disitribution , to emulate user arrival/execution of a query. After each sleep I'd like to execute the query by sending it to the program's driver (in this case Spark, and the driver allows for multiple threads using it.) Is there a more obvious way than to use Futures?

3

3 Answers

17
votes

When you are using using import ExecutionContext.Implicits.global, It creates thread pool which has the same size of the number of CPUs.

From the source of the ExecutionContext.scala

The default ExecutionContext implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].

And there's good StackOverflow question: What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?

Since the default size of the thread pool depends on number of CPUs, if you want to use larger thread pool, you have to write something like

import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8))

before executing the Future.

( In your code, you have to place it before for loop. )

Note that work stealing pool was added in java 8, scala has their own ForkJoinPool which does the work stealing: scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool

Also if you want one thread per Future, you can write something like

implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

Therefore, the following code executes 100 threads in parallel

import scala.concurrent._
import java.util.concurrent.Executors

object Main extends App{
  for (x <- 1 to 100) {
    implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
    val f = Future {
      val sleepTime =  2000
      Thread.sleep(sleepTime)

      val today = Calendar.getInstance().getTime()
      println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
      1;
    }
  }

  Thread.sleep(10000)
}

In addition to work stealing thread pool and single thread executors, there's some other executors: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html

Read the docs for detail: http://docs.scala-lang.org/overviews/core/futures.html

2
votes

The default pool when using import scala.concurrent.ExecutionContext.Implicits.global indeed has as many threads as you have cores on your machine. This is ideal for non-blocking code (no synchronous io/sleep/...) but can be problematic and even cause deadlocks when you use it for blocking code.

However, this pool can actually grow if you mark blocking code in a scala.concurrent.blocking block. The same marker is for example in use when you are using Await.result and Await.ready functions that block while waiting for a Future.

see the api docs for blocking

So all you have to do is update your example:

import scala.concurrent.blocking
...
val sleepTime = 2000
blocking{
  Thread.sleep(sleepTime)
}
...

Now all futures will end after 2000 ms

0
votes

you can also use

`implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))`

in NUMBEROFTHREADSYOUWANT you can give number of threads want to start. This will use before Future .