4
votes

Assume a lambda expression consume a certain amount of a resource (like memory) which is limited and requires to limit the number of concurrent executions (example: if the lambda temporarily consumes 100 MB (of local memory) and we like to limit it to 1GB, we do not allow for more that 10 concurrent evaluations).

What is the best way to limit the number of concurrent execution, say for example in

IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });

?

Note: An obvious option is to perform a nesting like

    double jobsPerThread = (double)numberOfJobs / numberOfThreads;
    IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
        IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));

Is this the only way? Tt is not that elegant. Actually I would like to have a

IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });
2
Why can't you use a shared fixed thread pool? What about the parallel streams complicates things?Gray
I did use a shared fixed thread pool in Java 6, but the Java 8 code is much more concise. With the thread pool I have to define an ArrayList of Futures, submit the worker to the executor, collect the results from the Futures. While this is the main motivation, I had the impression that the streams are more efficient (by about 10%).Christian Fries
@AlexeiKaigorodov Thanks for the hint to creating a ForkJoinPool with a given target parallelism level or change the current target parallelism level. This is a good solution too.Christian Fries

2 Answers

4
votes

The Streams use a ForkJoinPool for parallel operations. By default they are using the ForkJoinPool.commonPool() which does not allow changing the concurrency afterwards. However, you can use your own ForkJoinPool instance. When you execute the stream code within the context of your own ForkJoinPool this context pool will be used for the stream operations. The following example illustrates this by executing the same operation once using default behavior and once using a custom pool with a fixed concurrency of 2:

import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class InterfaceStaticMethod {
    public static void main(String[] arg) throws Exception {
      Runnable parallelCode=() -> {
        HashSet<String> allThreads=new HashSet<>();
        IntStream.range(0, 1_000_000).parallel().filter(i->{
          allThreads.add(Thread.currentThread().getName()); return false;}
        ).min();
        System.out.println("executed by "+allThreads);
      };
      System.out.println("default behavior: ");
      parallelCode.run();
      System.out.println("specialized pool:");
      ForkJoinPool pool=new ForkJoinPool(2);
      pool.submit(parallelCode).get();
    }
}
2
votes

Depending on your use case, using the CompletableFuture utility methods may be easier:

import static java.util.concurrent.CompletableFuture.runAsync;

ExecutorService executor = Executors.newFixedThreadPool(10); //max 10 threads
for (int i = 0; i < numberOfJobs; i++) {
    runAsync(() -> /* do something with i */, executor);
}

//or with a stream:
IntStream.range(0, numberOfJobs)
         .forEach(i -> runAsync(() -> /* do something with i */, executor));

The main difference with your code is that the parallel forEach will only return after the last job is over, whereas runAsync will return as soon as all the jobs have been submitted. There are various ways to change that behaviour if required.