3
votes

SHORT VERSION

I'm looking for a way to set once and for all what Pool to use globally when I call the .par function of a collection...

Up to now I found only how to set the number of threads in the global ExecutionContext but not how to change the actual Pool used by default.

I merely want to explicitly specify the ForkJoinPool to make the parallel collections ExecutionContext independent from the Scala version I use.


LONG VERSION This requirement came in after we've got issues because Scala 2.10 doesn't support JDK 1.8

Scala simply didn't recognize the java version and thought we were still in 1.5, hence the pool was a different type and the number of threads wasn't limited to the number of processors

The problem is caused by this code:

if (scala.util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport
    else new ThreadPoolTaskSupport

def isJavaAtLeast(version: String) = {
    val okVersions = version match {
      case "1.5"    => List("1.5", "1.6", "1.7")
      case "1.6"    => List("1.6", "1.7")
      case "1.7"    => List("1.7")
      case _        => Nil
    }
    okVersions exists (javaVersion startsWith _)
  }

As how we manage threads is quite critical in our application and we don't want unexpected surprises just changing a version, I wondered if it was possible to force Scala to use ForkJoinPool with a preset number of threads decided by us GLOBALLY (I don't want the single instance solution described here Scala Parallel Collections: How to know and configure the number of threads)

hope it's clear enough!

1
"Scala 2.10 doesn't support JDK 1.8" — eh? it doesn't? (are you thinking of 2.9?) - Seth Tisue
It looks like there is no good solution so I suggest one more trick that works well only if you have few calls to par that you can easily track and change: add your own implicit wrapper class for parallelizable collection that adds customPar method that would call par and assign some fixed tasksupport object that you can configure. - SergGr
@SethTisue, I'm not thinking at 2.9, the code I pasted comes from scala.util.Properties of version 2.10.2 To answer your second comment, yes, I've seen the link you've pasted, but I don't like to rely on reflection for this, as it would strictly rely on the presence of certain fields across the versions, which is quite weak - gmcontessa
@SergGr, thanks for the suggestion. I would prefer another approach as I'm trying to find a solution which would avoid errors in future code. Using a wrapper works only until the entire team (and new members) remember to use it instead of the standard method. Anyway, I wonder if an implicit wrapper could override the par method. I may give it a go and see if that could be a way forward - gmcontessa

1 Answers

0
votes

From my point of view, your question contain two different requirements :

One is I merely want to explicitly specify the ForkJoinPool to make the parallel collections ExecutionContext independent from the Scala version I use.

I'm not aware this is possible. Above all things, I'm made skeptical by the constructor class ForkJoinTaskSupport(val environment: ForkJoinPool). This constructor is being called with the ForkJoinPool backing the current execution context used by .par, which is the Global one if I'm not mistaken. A few layers later, we realize that this pool is defined here in ExecutionContextImpl :

def createExecutorService: ExecutorService = {

    [...]

    val desiredParallelism = range(
      getInt("scala.concurrent.context.minThreads", "1"),
      getInt("scala.concurrent.context.numThreads", "x1"),
      getInt("scala.concurrent.context.maxThreads", "x1"))

    val threadFactory = new DefaultThreadFactory(daemonic = true)

    try {
      new ForkJoinPool(
        desiredParallelism,
        threadFactory,
        uncaughtExceptionHandler,
        true) // Async all the way baby
    } catch {
      [...]
    }
  }

So it's not exactly a pool you can change, but it's still a pool you can definitely configure, which would solve the reformulation of your requirement, aka I wondered if it was possible to force Scala to use ForkJoinPool with a preset number of threads decided by us GLOBALLY

Full disclaimer : I never tried to do so, since I have not needed it so far, but your question made me wanna investigate a bit!