0
votes

I'm trying to create a producer-consumer program, where the consumers must keep running until all the producers are finished, then consume what's left in the queue (if there's anything left) and then end. You can check my code bellow, I think I know where the problem (probably deadlock) is, but I don't know how to make it work properly.

  #include<iostream>
  #include<cstdlib>
  #include <queue>
  #include <thread>
  #include <mutex>
  #include <condition_variable>

  using namespace std;

  class Company{
    public:
        Company() : producers_done(false) {}
        void start(int n_producers, int n_consumers); // start customer&producer threads
        void stop(); // join all threads
        void consumer();
        void producer();
        /* some other stuff */
    private:
        condition_variable cond;
        mutex mut;
        bool producers_done;
        queue<int> products;
        vector<thread> producers_threads;
        vector<thread> consumers_threads;
        /* some other stuff */
  };

void Company::consumer(){
    while(!products.empty()){
        unique_lock<mutex> lock(mut);
        while(products.empty() && !producers_done){
            cond.wait(lock); // <- I think this is where the deadlock happens
        }
        if (products.empty()){
            break;
        }
        products.pop();
        cout << "Removed product " << products.size() << endl;
    }
}

void Company::producer(){
    while(true){
        if((rand()%10) == 0){
          break;
        }
        unique_lock<mutex> lock(mut);
        products.push(1);
        cout << "Added product " << products.size() << endl;
        cond.notify_one();
    }
}

void Company::stop(){
    for(auto &producer_thread : producers_threads){
        producer_thread.join();
    }
    unique_lock<mutex> lock(mut);
    producers_done = true;
    cout << "producers done" << endl;
    cond.notify_all();
    for(auto &consumer_thread : consumers_threads){
        consumer_thread.join();
    }
    cout << "consumers done" << endl;
}

void Company::start(int n_producers, int n_consumers){
  for(int i = 0; i<n_producers; ++i){
    producers_threads.push_back(thread(&Company::producer, this));
  }

  for(int i = 0; i<n_consumers; ++i){
    consumers_threads.push_back(thread(&Company::consumer, this));
  }
}

int main(){
  Company c;
  c.start(2, 2);
  c.stop();

  return true;
}

I know, there are a lot of producer-consumer related questions here, and I've scrolled through at least 10 of them, but none provided answer to my issue.

1
It's generally a good idea to isolate the synchronization bits with a good abstraction. What you're looking for here is a bounded buffer. - Peter Ruderman
Now that you sneakily integrated my changes, does it still deadlock? - Maxim Egorushkin
Your code would not compile. Post a Minimal, Complete, and Verifiable example. - Maxim Egorushkin
I don't want to get too far off topic, but in general we don't want to allow an algorithm to eat up all system resources. We should think about and set a suitable upper limit (which can be quite large, if appropriate). But my main point is that the bounded buffer abstraction isolates all the locking and waiting. All the producer has to do is make items and call push(). All the consumer has to do is call pop() and deal with the items. (I was also hoping that the OP would look up bounded buffer to see what it is.) - Peter Ruderman
I've included the minimal, complete and verifiable example. There are few simplifications, for example the condition in producer is different (it's supposed to be triggered by a different class), but the deadlock (or some other issue) remains. - adamlowlife

1 Answers

2
votes

When people use std::atomic along with std::mutex and std::condition_variable that results in deadlock in almost 100% of cases. This is because modifications to that atomic variable are not protected by the mutex and hence condition variable notifications get lost when that variable is updated after the mutex is locked but before condition variable wait in the consumer.

A fix would be to not use std::atomic and only modify and read producers_done while the mutex is held. E.g.:

void Company::consumer(){
    for(;;){
        unique_lock<mutex> lock(mut);
        while(products.empty() && !producers_done)
            cond.wait(lock);
        if(products.empty())
            break;
        orders.pop();
    }   
}

Another error in the code is that in while(!products.empty()) it calls products.empty() without holding the mutex, resulting in a race condition.


The next error is keeping the mutex locked while waiting for the consumer threads to terminate. Fix:

{
    unique_lock<mutex> lock(mut);
    producers_done = true;
    // mutex gets unlocked here.
}
cond.notify_all();

for(auto &consumer_thread : consumers_threads)
    consumer_thread.join();