I am currently working on an educational assignment in which i have to implement a Semaphore only thread-safe thread pool.
I mustn't use during my assignment: Synchronize
wait
notify
sleep
or any thread-safe API's.
firstly without getting too much into the code i have:
- Implemented a Thread-safe queue (no two threads can queue\dequeue at the same time) (i have tested this problem with
ConcurrentLinkedQueue
and the problem persists)
The design itself:
Shared:
Tasks
semaphore = 0Available
semaphore = 0Tasks_Queue
queueAvailable_Queue
queue
Worker Threads:
Blocked
semaphore = 0
General Info:
Only manager(single thread) can dequeue
Tasks_Queue
andAvailable_Queue
Only App-Main(single thread) can enqueue tasks is
Tasks_Queue
Each worker thread can enqueue themselves in
Available_Queue
So we have a mix of a single producer, a single manager and several consumers.
- When the app first starts each of the worker threads gets started and immediately enqueues itself in
Available_Queue
, releasesAvailable
semaphore and gets blocked acquiring it's personalBlocked
semaphore. - Whenever App-Main queues a new task it releases
Task
Semaphore - Whenever Manager wishes to execute a new task it must first acquire both
Tasks
andAvailable
semaphores.
My question:
during the app's runtime the function dequeue_worker()
returns a null worker, even though a semaphore is placed to protect access to the queue when it is known that there are no available worker threads.
i have "solved" the problem by recursively calling dequeue_worker()
if it draws a null thread, BUT doing so is suppose to make an acquisition of a semaphore permit lost forever. yet when i limit the amount of workers to 1 the worker does not get blocked forever.
1) what's the break-point of my original design?
2) why doesn't my "solution" break the design even further?!
Code snippets:
// only gets called by Worker threads: enqueue_worker(this);
private void enqueue_worker(Worker worker) {
available_queue.add(worker);
available.release();
}
// only gets called by App-Main (a single thread)
public void enqueue_task(Query query) {
tasks_queue.add(query);
tasks.release();
}
// only gets called by Manager(a single Thread)
private Worker dequeue_worker() {
Worker worker = null;
try {
available.acquire();
worker = available_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
} // **** the solution: ****
if (worker==null) worker = dequeue_worker(); // TODO: find out why
return worker;
}
// only gets called by Manager(a single Thread)
private Query dequeue_task() {
Query query = null;
try {
tasks.acquire();
query = tasks_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
}
return query;
}
// gets called by Manager (a single thread)
private void execute() { // check if task is available and executes it
Worker worker = dequeue_worker(); // available.down()
Query query = dequeue_task(); //task.down()
worker.setData(query);
worker.blocked.release();
}
And finally Worker's Run()
method:
while (true) { // main infinite loop
enqueue_worker(this);
acquire(); // blocked.acquire();
<C.S>
available.release();
}