0
votes

I am wokring on pthread pool and there will be five separate thread and one queue. All the five threads are competing to get a job from the queue and I know the basic idea that I need to do lock/unlock and wait/signal.

But I am not sure how many mutex and cond variable I should have. Right now I have only one mutex and cond variable and all five thread will use it.

3

3 Answers

2
votes

To elaborate on @Ivan's solution...

Instead of a mutex + condition variable, you can use a counting semaphore + atomic operations to create a very efficient queue.

semaphore dequeue_sem = 0;
semaphore enqueue_sem = 37; // or however large you want to bound your queue

Enqueue operation is just:

wait_for(enqueue_sem)
atomic_add_to_queue(element)
signal(dequeue_sem)

Dequeue operation is:

wait_for(dequeue_sem)
element = atomic_remove_from_queue()
signal(enqueue_sem)

The "atomic_add_to_queue" and "atomic_remove_from_queue" are typicaly implemented using atomic compare&exchange in a tight loop.

In addition to its symmetry, this formulation bounds the maximum size of the queue; if a thread calls enqueue() on a full queue, it will block. This is almost certainly what you want for any queue in a multi-threaded environment. (Your computer has finite memory; consuming it without bound should be avoided when possible.)

If you do stick with a mutex and condition variables, you want two conditions, one for enqueue to wait on (and deque to signal) and one for the other way around. The conditions mean "queue not full" and "queue not empty", respectively, and the enqueue/dequeue code is similarly symmetric.

3
votes

One mutex and at least one condition variable.

One mutex because there is one 'thing' (i.e. piece of memory) to synchronize access to: the shared state between all workers and the thread pushing the work.

One condition variable per, well, condition that one or more threads need to wait on. At the very least you need one condition variable for waiting on new jobs, the condition here being: "is there more stuff to do?" (or the converse: "is the work queue empty?").


A somewhat more substantive answer would be that there is a one-to-many relationship between a mutex and associated condition variables, and a one-to-one relationship between shared states and mutexes. From what you've told us and since you're learning, I recommend using only one shared state for your design. When or if you need more that one state I'd recommend looking for some higher level concepts (e.g. channels, futures/promises) to build up on abstraction.

In any case, don't use the same condition variable with different mutexes.

1
votes

I think that you could do stealing work from queue without locking at all via Interlocked operations if you organize it as stack/linked list (it will require semaphore instead of condition variable to prevent problem described in comments to this answer).

Pseudo-code is like that:

  1. candidate = head.
  2. if (candidate == null) wait_for_semaphore;
  3. if (candidate == InterlockedCompareExchange(head, candidate->next, candidate)) perform_work(candidate->data);
  4. else goto 1;

Of course, adding work to queue should be also via InterlockedCompareExchange in this case and signaling the semaphore.