2
votes

I wrote a code of multiply producers and consumers with condition variables. Even when I have only one producer and one consumer it doesn't work. Both producers and consumers should run in while(true). When I run the code it gets stuck around 50% of runs. I guess it gets in a deadlock caused by over wait. I don't succeed to debug where it is stucked and how to unlock the conds. By request, I must create the code with wait,signal and broadcast.

If the queue is full, producer is waiting. if the queue is empty, consumer is waiting.

void WaitableQueue::enqueue(size_t a_item)
{
    (m_cond.getMutex()).lock();

    while(m_itemsCounter==m_capacity && !m_isBeingDestroyed)
    {
        ++m_numberOfWaiting;
        m_cond.wait();
        --m_numberOfWaiting;
    }

    std::cout<<"enqueue "<<a_item<<"\n";

    m_queue.push(a_item);
    ++m_itemsCounter;
    ++m_numbOfProduced;
    if(m_isBeingDestroyed)
    {
        m_cond.broadcast(); 
    }

    (m_cond.getMutex()).unlock();
    m_cond.broadcast();
}

void WaitableQueue::dequeue()
{
    (m_cond.getMutex()).lock();

    while(m_itemsCounter==0 && !m_isBeingDestroyed)
    {
        ++m_numberOfWaiting;
        std::cout<<"Waiting\n";
        m_cond.wait();
        std::cout<<"Done waiting\n";
        --m_numberOfWaiting;
    }

    if (m_isBeingDestroyed)
    {
        (m_cond.getMutex()).unlock();
        m_cond.broadcast();
        return;
    }
    std::cout<<"dequeue "<<m_queue.front()<<"\n";
    m_queue.pop();
    --m_itemsCounter;
    ++m_numbOfConsumed;
    (m_cond.getMutex()).unlock();
    m_cond.broadcast();
}

void WaitableQueue::destroy()
{
    (m_cond.getMutex()).lock();
    m_isBeingDestroyed=true;
    (m_cond.getMutex()).unlock();
}



void Producer::run()
{
    for(size_t i=0;i<m_numOfItemsToProduce;++i)
    {
        usleep(m_delay);
        size_t item=produce();
        m_wq.enqueue(item);
    }
}


Producer::produce() const
{
    return rand()%m_numOfItemsToProduce;
}

void Consumer::run()
{
    m_numOfProducersMutex.lock();
    while(m_numOfProducers>0)
    {
        m_numOfProducersMutex.unlock();
        usleep(m_delay);
        m_wq.dequeue();
        m_numOfProducersMutex.lock();
    }
    m_numOfProducersMutex.unlock();
}


int main()
{
    size_t numProducers=1;
    size_t numConsumers=3;
    Mutex mutex;
    ConditionalVariable cond(mutex);

    WaitableQueue<size_t> wq(NUM_OF_ITEMS,cond);
    std::vector<Producer<size_t>*> producerArray;
    std::vector<Consumer<size_t>*> consumerArray;
    Mutex numOfProducersMutex;

    for(size_t i=0;i<numProducers;++i)
    {
        Producer<size_t>* tempP=new Producer<size_t>(wq,NUM_OF_ITEMS,DELAY);
        producerArray.push_back(tempP);
    }

    for(size_t i=0;i<numConsumers;++i)
    {
        Consumer<size_t>* tempC=new Consumer<size_t>(wq,numProducers,numOfProducersMutex,DELAY);
        consumerArray.push_back(tempC);
    }

    for(size_t i=0;i<numProducers;++i)
    {
        producerArray[i]->start();
    }

    for(size_t i=0;i<numConsumers;++i)
    {
        consumerArray[i]->start();
    }

    for(size_t i=0;i<numProducers;++i)
    {
        producerArray[i]->join();
        numOfProducersMutex.lock();
        --numProducers;
        numOfProducersMutex.unlock();
    }
    usleep(100);

    //tell the consumers stop waiting
    wq.destroy();
   for(size_t i=0;i<numConsumers;++i)
    {
        consumerArray[i]->join();
    }

   for(size_t i=0;i<numProducers;++i)
   {
        delete producerArray[i];
   }

    for(size_t i=0;i<numConsumers;++i)
   {
        delete consumerArray[i];
   }
}

It works around 50% of runnings. In other 50% it gets stucked.

2
Recommendation (that probably won't fix your current problem (but might fix a future one)): Don't lock and unlock the mutex yourself. Ensure that the mutex is unlocked with a RAII helper like std::lock_guarduser4581301
Looking over the code a bit more, I think you should take a look at the example here that shows a better way to use condition variables.user4581301
You are not unlocking the mutex after locking it when you're waiting for empty/full queue. You need to release the mutex before you go waiting. If you debug the code, you will find that one thread is waiting on CV while locking the mutex, all while the other thread is waiting to get the mutex. Unlock mutex before you call CV.wait. This may have some race conditions so be careful.Everyone
@Everyone Actually the mutex should always be locked before calling CV wait. In this case it looks like there is a custom class wrapper around CV that associates a mutex with the CV. (This is fine as long as you always protect the CV loop-waiting condition with that lock, which is the case here)Humphrey Winnebago
Please provide the rest of the code. Note that if you pause the debugger while it's deadlocked, it will show you where the deadlock is.Humphrey Winnebago

2 Answers

-1
votes

To solve the producer-consumer problem using condition variable, first you need to understand bounded buffer problem.

Check here the implementation of thread safe buffer queue using condition variable in C++: https://codeistry.wordpress.com/2018/03/08/buffer-queue-handling-in-multithreaded-environment/

You can use this buffer queue as the building block to solve the multiple produce consumer problem. Please check here how the thread safe buffer queue is used to solve producer-consumer problem in C++: https://codeistry.wordpress.com/2018/03/09/unordered-producer-consumer/

-7
votes

You have discovered another example of how C++ makes a difficult problem out of a conceptually simple problem.

It appears that you want one or more producers to produce an identical number of values and have a set of consumers read and process those values. It also appears that you want the number of producers to equal the number of consumers, while allowing that number (producers and consumers) to be configurable.

This problem is very simple using Ada, which was designed with concurrency in mind.

The first file is the Ada package specification defining our producer and consumer task types.

generic
   Items_To_Handle : Positive;
package Integer_Prod_Con is

   task type Producer;

   task type Consumer;

end Integer_Prod_Con;

The generic parameter is much like a template parameter. In this case the value passed as a generic parameter must be a positive integer. The implementation of the package follows.

with Ada.Containers.Synchronized_Queue_Interfaces;
with Ada.Containers.Unbounded_Synchronized_Queues;
with Ada.Text_Io; use Ada.Text_IO;

package body Integer_Prod_Con is
   package Int_Interface is new Ada.Containers.Synchronized_Queue_Interfaces(Integer);
   package Int_Queue is new Ada.Containers.Unbounded_Synchronized_Queues(Queue_Interfaces =>Int_Interface);
   use Int_Queue;

   The_Queue : Queue;

   --------------
   -- Producer --
   --------------

   task body Producer is
   begin
      for Num in 1..Items_To_Handle loop
         The_Queue.Enqueue(Num);
         delay 0.010;
      end loop;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Value : Integer;
   begin
      for Num in 1..Items_To_Handle loop
         The_Queue.Dequeue(Value);
         Put_Line(Value'Image);
         delay 0.010;
      end loop;
   end Consumer;

end Integer_Prod_Con;

The package employs a pre-defined generic package implementing an unbounded queue as the buffer. This allows the queue to grow and shrink as needed by the program. Each producer task enqueues the integer values from 1 through Items_To_Handle and each consumer dequeues and outputs the same number of elements from the queue.

The main procedure for this program is:

with Integer_Prod_Con;

procedure Int_Queue_Main is
   PC_Count : constant := 3;

   package short_list is new Integer_Prod_Con(10);
   use short_List;

   Producers : Array(1..PC_Count) of Producer;
   Consumers : Array(1..PC_Count) of Consumer;
begin
   null;
end Int_Queue_Main;

The output of this program is:

 1
 1
 1
 2
 2
 2
 3
 3
 3
 4
 4
 4
 5
 5
 5
 6
 6
 6
 7
 7
 7
 8
 8
 8
 9
 9
 9
 10
 10
 10