1
votes

I have a large batched parallel computation that I use a parallel map for in scala. I have noticed that there appears to be a gradual downstepping of CPU usage as the workers finish. It all comes down to a call to a call inside of the Map object

scala.collection.parallel.thresholdFromSize(length, tasksupport.parallelismLevel)

Looking at the code, I see this:

def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
  val p = parallelismLevel
  if (p > 1) 1 + sz / (8 * p)
  else sz
}

My calculation works great on a large number of cores, and now I understand why..

thesholdFromSize(1000000,24) = 5209
thesholdFromSize(1000000,4) = 31251

If I have an array of length 1000000 on 24 CPU's it will partition all the way down to 5209 elements. If I pass that same array into the parallel collections on my 4 CPU machine, it will stop partitioning at 31251 elements.

It should be noted that the runtime of my calculations is not uniform. Runtime per unit can be as much as 0.1 seconds. At 31251 items, that's 3100 seconds, or 52 minutes of time where the other workers could be stepping in and grabbing work, but are not. I have observed exactly this behavior while monitoring CPU utilization during the parallel computation. Obviously I'd love to run on a large machine, but that's not always possible.

My question is this: Is there any way to influence the parallel collections to give it a smaller threshold number that is more suited to my problem? The only thing I can think of is to make my own implementation of the class 'Map', but that seems like a very non-elegant solution.

3

3 Answers

0
votes

You want to read up on Configuring Scala parallel collections. In particular, you probably need to implement a TaskSupport trait.

0
votes

I think all you need to do is something like this:

yourCollection.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(24))

The parallelism parameter defaults to the number of CPU cores that you have, but you can override it like above. This is shown in the source for ParIterableLike as well.

0
votes

0.1 second is large time enough to handle it separately. Wrap processing of each unit (or 10 units) in a separate Runnable and submit all of them to a FixedThreadPool. Another approach is to use ForkJoinPool - then it is easier to control the end of all computations.