As mentioned in the link below:-
How to get the ThreadPoolExecutor to increase threads to max before queueing?
I changed the queue implementation to return false after entering element. As a result of which whenever a new task is inserted into the queue a new thread is created for it.
But when i ran the below implementation on a large scale (Bis System Testing) with loggers placed a new problem is generated.
When a task comes for execution it gets inserted into the queue and as queue returns false a new thread is created for its execution. Idle threads which are currently there in the pool are not picked up. Reason being Tasks are assigned to idle threads from getTask()
method which picks tasks from queue. So my question is how to change this behavior so that if threads are idle how to make sure that idle threads are assigned tasks for execution rather than creating new threads ??
Below output will make it more clear:-
Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
The code files are as follows:-
ThreadPoolExecutor is of version java 1.5 as we are using 1.5 on server machine and cannot upgrade it.
ThreadPoolExecutor:-
public void execute(Runnable command) {
System.out.println("Active Count: " + getActiveCount()
+ " Pool Size : " + getPoolSize() + " Idle Count: "
+ (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
int status = addIfUnderMaximumPoolSize(command);
if (status > 0) // created new thread
return;
if (status == 0) { // failed to create thread
reject(command);
return;
}
// Retry if created a new thread but it is busy with another task
}
}
LinkedBlockingQueue:-
public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E>
{
/**
*
*/
private static final long serialVersionUID = 1L;
public CustomBlockingQueue() {
super(Integer.MAX_VALUE);
}
public boolean offer(E e) {
return false;
}
}
In rejection handler we are calling put method of queue which we haven't overriden
Callingexecutor
final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());
private static final int TASK_COUNT = 100;
for (int i = 0; i < TASK_COUNT; i++) {
......
tpe.execute(new Task(i));
.....
}
We are calling the executor with core pool size as 3 max pool size as 8 and using unbounded linked blocking queue for tasks.