10
votes

Normally when one uses Java 8's parallelStream(), the result is execution via the default, common fork-join pool (i.e. ForkJoinPool.commonPool()).

That is clearly undesirable, however, if one has work that is far from CPU bound, e.g. may be waiting on IO much of the time. In such cases one will want to use a separate pool, sized according to other criteria (e.g. how much of the time the tasks are likely to be actually using the CPU).

There's no obvious means of getting parallelStream() to use a different pool, but there is a way as detailed here.

Unfortunately, that approach entails invoking the terminal operation on the parallel stream from a fork-join pool thread. The downside of this is that if the target-fork join pool is completely busy with existing work, the whole execution will wait on it while doing absolutely nothing. Thus the pool can become a bottleneck worse than single threaded execution. By contrast, when one uses parallelStream() in the "normal" fashion, ForkJoinPool.common.externalHelpComplete() or ForkJoinPool.common.tryExternalUnpush() are used to let the calling thread from outside the pool help in the processing.

Does anyone know of a way to both get parallelStream() to use a non-default fork-join pool and have a calling thread from outside the fork-join pool help in the processing of this work (but not the rest of the fork-join pool's work)?

1
I don't understand your The downside of this is that if the target-fork join pool is completely busy with existing work. Wouldn't you be creating a new pool just for this parallel stream invocation?Sotirios Delimanolis
It’s even worse. When you are calling get on your tasks which is not in the common pool, it will still call ForkJoinPool.common.tryExternalUnpush(), but, of course, won’t find the task in the common pool’s queue.Holger
To answer the question, no, I'd not be creating a new thread pool just for this invocation. Rather I'd be sharing this other thread pool across many similar invocations, some of which could be overlapping, some of which could have much longer/bigger tasks than others, etc.Jess Holle

1 Answers

2
votes

You can use awaitQuiescence on the pool to help out. However, you can’t select which task(s) you will help, it will just take the next pending from the pool, thus, if there are more pending tasks, you might ending up executing these before getting to your own.

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone()) // use zero timeout to execute one task only
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

will print true.

whereas

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// overload:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone())
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

will hang forever as it attempts to execute the second blocking task.

Nevertheless, it will let the initiating thread help processing the pool’s pending tasks which will raise the chance of its own task getting executed as long as there are no infinite tasks (the example above is extreme and only chosen for demonstration).


But note that the entire relationship between the Fork/Join framework and the Stream API is an implementation detail anyway.