50
votes

I'm looking for the simplest, most straightforward way to implement the following:

  • The main program instantiates worker threads to do a task.
  • Only n tasks can be running at once.
  • When n is reached, no more workers are started until the count of running threads drops back below n.
7

7 Answers

56
votes

I think that Executors.newFixedThreadPool fits your requirements. There are a number of different ways to use the resulting ExecutorService, depending on whether you want a result returned to the main thread, or whether the task is totally self-contained, and whether you have a collection of tasks to perform up front, or whether tasks are queued in response to some event.

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

Alternatively, if you have a new asynchronous task to perform in response to some event, you probably just want to use the ExecutorService's simple execute(Runnable) method.

25
votes
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
    /* ...execute the task to run concurrently as a runnable: */
    exec.execute(new Runnable() {
        public void run() {
            /* do the work to be done in its own thread */
            System.out.println("Running in: " + Thread.currentThread());
        }
    });
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
    /* The tasks are now running concurrently. We wait until all work is done, 
     * with a timeout of 50 seconds: */
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
    /* If the execution timed out, false is returned: */
    System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
7
votes

Executors.newFixedThreadPool(int)

Executor executor = Executors.newFixedThreadPool(n);

Runnable runnable = new Runnable() {
 public void run() {
  // do your thing here
 }
}

executor.execute(runnable);
2
votes

Use the Executor framework; namely newFixedThreadPool(N)

1
votes
  1. If your task queue is not going to be unbounded and tasks can complete in shorter time intervals, you can use Executors.newFixedThreadPool(n); as suggests by experts.

    The only drawback in this solution is unbounded task queue size. You don't have control over it. The huge pile-up in task queue will degrade performance of application and may cause out of memory in some scenarios.

  2. If you want to use ExecutorService and enable work stealing mechanism where idle worker threads share the work load from busy worker threads by stealing tasks in task queue. It will return ForkJoinPool type of Executor Service.

    public static ExecutorService newWorkStealingPool(int parallelism)

    Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

  3. I prefer ThreadPoolExecutor due to flexibility in APIs to control many paratmeters, which controls the flow task execution.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    

in your case, set both corePoolSize and maximumPoolSize as N. Here you can control task queue size, define your own custom thread factory and rejection handler policy.

Have a look at related SE question to control the pool size dynamically:

Dynamic Thread Pool

0
votes

If you want to roll your own:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Where Worker is your class that extends Thread. Each worker will call this class's removeWorker method, passing itself in as a parameter, when it's finished doing it's thing.

With that said, the Executor framework looks a lot better.

Edit: Anyone care to explain why this is so bad, instead of just downmodding it?

0
votes

As others here have mentioned, your best bet is to make a thread pool with the Executors class:

However, if you want to roll your own, this code should give you an idea how to proceed. Basically, just add every new thread to a thread group and make sure that you never have more than N active threads in the group:

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}