1
votes

I am currently working on an educational assignment in which i have to implement a Semaphore only thread-safe thread pool.

I mustn't use during my assignment: Synchronize wait notify sleep or any thread-safe API's.

firstly without getting too much into the code i have:

  • Implemented a Thread-safe queue (no two threads can queue\dequeue at the same time) (i have tested this problem with ConcurrentLinkedQueue and the problem persists)

The design itself:

Shared:

  • Tasks semaphore = 0

  • Available semaphore = 0

  • Tasks_Queue queue

  • Available_Queue queue

Worker Threads:

  • Blocked semaphore = 0

General Info:

  • Only manager(single thread) can dequeue Tasks_Queue and Available_Queue

  • Only App-Main(single thread) can enqueue tasks is Tasks_Queue

  • Each worker thread can enqueue themselves in Available_Queue

So we have a mix of a single producer, a single manager and several consumers.

  • When the app first starts each of the worker threads gets started and immediately enqueues itself in Available_Queue, releases Available semaphore and gets blocked acquiring it's personal Blocked semaphore.
  • Whenever App-Main queues a new task it releases Task Semaphore
  • Whenever Manager wishes to execute a new task it must first acquire both Tasks and Available semaphores.

My question:

during the app's runtime the function dequeue_worker() returns a null worker, even though a semaphore is placed to protect access to the queue when it is known that there are no available worker threads.

i have "solved" the problem by recursively calling dequeue_worker() if it draws a null thread, BUT doing so is suppose to make an acquisition of a semaphore permit lost forever. yet when i limit the amount of workers to 1 the worker does not get blocked forever.

1) what's the break-point of my original design?

2) why doesn't my "solution" break the design even further?!

Code snippets:

// only gets called by Worker threads: enqueue_worker(this);
    private void enqueue_worker(Worker worker) {
       available_queue.add(worker);
       available.release();
    }

// only gets called by App-Main (a single thread)
    public void enqueue_task(Query query) {
        tasks_queue.add(query);
        tasks.release();
    }

// only gets called by Manager(a single Thread) 
    private Worker dequeue_worker() {
        Worker worker = null;
        try {
            available.acquire();
            worker = available_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } // **** the solution: ****
        if (worker==null) worker = dequeue_worker(); // TODO: find out why
        return worker;
    }

// only gets called by Manager(a single Thread) 
    private Query dequeue_task() {
        Query query = null;
        try {
            tasks.acquire();
            query = tasks_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } 
        return query;
    }

// gets called by Manager (a single thread)
    private void execute() { // check if task is available and executes it
        Worker worker = dequeue_worker(); // available.down()
        Query query = dequeue_task(); //task.down()
        worker.setData(query);
        worker.blocked.release();
    }

And finally Worker's Run() method:

while (true) { // main infinite loop

                enqueue_worker(this);
                acquire(); // blocked.acquire();
                <C.S>
                available.release();
            }
1

1 Answers

3
votes

You are calling available.release() twice, once in enqueue_worker, second time in a main loop.