2
votes

As mentioned in the link below:-

How to get the ThreadPoolExecutor to increase threads to max before queueing?

I changed the queue implementation to return false after entering element. As a result of which whenever a new task is inserted into the queue a new thread is created for it.

But when i ran the below implementation on a large scale (Bis System Testing) with loggers placed a new problem is generated.

When a task comes for execution it gets inserted into the queue and as queue returns false a new thread is created for its execution. Idle threads which are currently there in the pool are not picked up. Reason being Tasks are assigned to idle threads from getTask() method which picks tasks from queue. So my question is how to change this behavior so that if threads are idle how to make sure that idle threads are assigned tasks for execution rather than creating new threads ??

Below output will make it more clear:-

Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0

The code files are as follows:-

ThreadPoolExecutor is of version java 1.5 as we are using 1.5 on server machine and cannot upgrade it.

ThreadPoolExecutor:-

 public void execute(Runnable command) {
    System.out.println("Active Count: " + getActiveCount()
            + " Pool Size : " + getPoolSize() + " Idle Count: "
            + (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
          if (command == null)
              throw new NullPointerException();
          for (;;) {
              if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
              if (status == 0) {   // failed to create thread
                  reject(command);
                  return;
              }
              // Retry if created a new thread but it is busy with another task
          }
      }

LinkedBlockingQueue:-

public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> 
{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public CustomBlockingQueue() {
    super(Integer.MAX_VALUE);
    }

    public boolean offer(E e) {
       return false;
    }

}

In rejection handler we are calling put method of queue which we haven't overriden

Callingexecutor

   final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());

private static final int TASK_COUNT = 100;

 for (int i = 0; i < TASK_COUNT; i++) {

......
tpe.execute(new Task(i));
.....
}

We are calling the executor with core pool size as 3 max pool size as 8 and using unbounded linked blocking queue for tasks.

4

4 Answers

1
votes

The easiest way to achieve the “start before queuing but prefer existing threads” behavior using a SynchronousQueue. It will accept offered items if and only if there’s already a waiting receiver. So idle threads will get items and once there are no idle threads the ThreadPoolExecutor will start new threads.

The only disadvantage is that once all threads are started, you can’t simply put the pending item into the queue as it has no capacity. So you either have to accept that the submitter gets blocked or you need another queue for putting pending tasks to it and another background thread which tries to put these pending items to the synchronous queue. This additional thread won’t hurt the performance as it is blocked in either of these two queues most of the time.

class QueuingRejectionHandler implements RejectedExecutionHandler {

  final ExecutorService processPending=Executors.newSingleThreadExecutor();

  public void rejectedExecution(
      final Runnable r, final ThreadPoolExecutor executor) {
    processPending.execute(new Runnable() {
      public void run() {
        executor.execute(r);
      }
    });
  }
}

ThreadPoolExecutor e=new ThreadPoolExecutor(
  corePoolSize, maximumPoolSize, keepAliveTime, unit,
  new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());
1
votes

I believe that you problem is in the following:

public boolean offer(E e) {
   return false;
}

This will always return false to the TPE which will cause it to start another thread, regardless of how many threads are currently idle. This is not what my code sample on this answer recommends. I had to correct an early problem with it after feedback.

My answer says to make your offer(...) method look something like:

public boolean offer(Runnable e) {
    /*
     * Offer it to the queue if there is 1 or 0 items already queued, else
     * return false so the TPE will add another thread.
     */
    if (size() <= 1) {
        return super.offer(e);
    } else {
        return false;
    }
}

So if there are 2 or more things already in the queue, it will fork another thread otherwise it will enqueue the task in queue which should be picked up by the idle threads. You might also play with the 1 value. Trying it with 0 or more than 1 may be appropriate for your application. Injecting that value into your CustomBlockingQueue might be in order.

0
votes

Solution given by Gray here is awesome, but I faced same problem as yours i.e ideal threads were not used to pick new task coming, but new thread was created in case poolSize is less than maxPoolSize.

So, I tried to tweak functionality of ThreadPoolExecutor itself, by copying complete class(not a good idea, but couldn't find any other solution) and extending it with ThreadPoolExecutor and overriding execute method.

Below is the method :

public void execute(Runnable command)
{
 System.out.println("ActiveCount : " + this.getActiveCount()
            + " PoolSize : " + this.getPoolSize() + " QueueSize : "
            + this.getQueue().size());

if (command == null)
    throw new NullPointerException();
for (;;)
{
    if (runState != RUNNING)
    {
    reject(command);
    return;
    }
    if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
    return;
    //Now, it will never offer to queue but will go further for thread creation.
    //if (workQueue.offer(command))
    //return;


    //This check is introduced to utilized ideal threads instead of creating new thread 
    //for incoming tasks.
    //Example : coreSize = 3, maxPoolSize = 8.
    //activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue is empty. 
    //When new task comes, it will offer that to queue, and getTask() will take care and execute the task.
    //But if new task comes, before ideal thread takes task from queue, 
    //activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue size = 1.
    //this check fails and new thread is created if poolsize under max size or 
    //task is added to queue through rejection handler.

    if ((this.getPoolSize() - this.getActiveCount()) > 0 && 
        (this.getPoolSize() - this.getActiveCount() - workQueue.size()) > 0)
    {
    workQueue.offer(command);
    return;
    }
    int status = addIfUnderMaximumPoolSize(command);
    if (status > 0) // created new thread
    return;
    if (status == 0)
    { // failed to create thread
    reject(command);
    return;
    }
    // Retry if created a new thread but it is busy with another task
}
}

In rejection handler I am using put method to put task in queue(unbounded), as suggested by Gray. :)

Note : I am not overriding behavior of Queue in my code.

0
votes

So my question is how to change this behavior so that if threads are idle how to make sure that idle threads are assigned tasks for execution rather than creating new threads ??

Things have been improved a lot in last couple of years. Your problem has a simple solution with Java 8 Executors newWorkStealingPool API

newWorkStealingPool

public static ExecutorService newWorkStealingPool()

Creates a work-stealing thread pool using all available processors as its target parallelism level.

ExecutorService executorService = Executors.newWorkStealingPool();

will do required magic for you. newWorkSteatingPool will return a ExecutorService of ForkJoinPool type. In ForkJoinPool, Idle threads will steal task from busy thread's queue, which you are looking for.