26
votes

Here's my understanding of the Stream framework of Java 8:

  1. Something creates a source Stream
  2. The implementation is responsible for providing a BaseStream#parallel() method, which in turns returns a Stream that can run it's operations in parallel.

While someone has already found a way to use a custom thread pool with Stream framework's parallel executions, I cannot for the life of me find any mention in the Java 8 API that the default Java 8 parallel Stream implementations would use ForkJoinPool#commonPool(). (Collection#parallelStream(), the methods in StreamSupport class, and others possible sources of parallel-enabled streams in the API that I don't know about).

Only tidbits that I could gleam off search results were these:


So my question is:

Where is it said that the ForkJoinPool#commonPool() is used for parallel operations on streams that are obtained from the Java 8 API?

3
The very last paragraph of here seems to state it ("Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release."), but it isn't quite satisfactory to me... I would guess that implementation details like that might not have been included to allow for future evolution, but considering that implementation details are included in so many other places it doesn't make much sense...awksp
There's another hint here ("With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you."), but again, it's not quite as explicit as you might want...awksp
It may not be stated in the API, for the reason mentioned in the other comment: It's an implementation detail. The most official resource I found (apart from the code - that's cheating ;-)) was jsr166-concurrency.10961.n7.nabble.com/… , where Doug Lea stated that "The ForkJoinPool common pool is used in JDK8 for all parallel Stream operations, parallel sorting, etc." ...Marco13
I would still consider it to be an implementation detail. Here, Stuart Marks warns about treating too much implementation details for granted.Holger
There is a funny example in Spliterator documentation, where they calculate batch size based on ForkJoinPool.getCommonPoolParallelism(). No other mention of fork/join though.Lukas

3 Answers

14
votes

W.r.t. where is it documented that Java 8 parallel streams use FJ Framework?

Afaik (Java 1.8u5) it is not mentioned in the JavaDoc of parallel streams that a common ForkJoinPool is used.

But it is mentioned in the ForkJoin documentation at the bottom of http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

W.r.t. replacing the Thread pool

My understanding is that you can use a custom ForkJoinPool (instead of the common one) - see Custom thread pool in Java 8 parallel stream -, but not a custom ThreadPool which is different from the ForkJoin implementation (I have an open question here: How to (globally) replace the common thread pool backend of Java parallel streams? )

W.r.t. replacing the Streams api

You may checkout https://github.com/nurkiewicz/LazySeq which is a more Scala like streams implementation - very nice, very interesting

PS (w.r.t. ForkJoin and Streams)

If you are interested, I would like to note that I stumbled across some issues with the use of the FJ pool, see, e.g.

5
votes

For what it's worth, Java 8 in Action has a chapter on Parallel data processing and performance (Chapter 7). It says:

"...the Stream interface gives you the opportunity to execute operations in parallel on a collection of data without much effort."

"...you’ll see how Java can make this magic happen or, more practically, how parallel streams work under the hood by employing the fork/join framework introduced in Java 7."

It also has a small side note in section 7.1:

"Parallel streams internally use the default ForkJoinPool...which by default has as many threads as you have processors, as returned by Runtime.getRuntime().availableProcessors()."

"you can change the size of this pool using the system property java.util .concurrent.ForkJoinPool.common.parallelism, as in the following example:"

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

As mentioned in the comments and other answers, this does not mean it will always use the fork/join.

1
votes

You can check source code of terminal operations on GrepCode. For example, lets take a look at ForEachOp. As you can see evaluateParallel method of ForEachOp creates and invokes ForEachTask object which is derived from CountedCompleter derived from ForkJoinTask.