0
votes

I have a single producer and 2 consumers threads trying to acess a shared buffer. Mutex locks are used between consumer and producer. Consumers are supposed to run paralelly. If buffer is empty, consumer sleeps and producer has to wake them. If buffer is full, producer does not do anything. Below is the code snippets I am working on: Producer thread:

void *writer(void*)

{
     // Initialising the seed
         srand(time(NULL));
     while(1)
     {
         pthread_mutex_lock(&rallow);
         if (Q.size() < MAX && item < MAX)
         {
            // Getting the random number
            int num = rand() % 10 + 1;
            // Pushing the number into queue
            Q.push(num);
            
            item++;
            cout << "Produced: " << num << " item: "<<item<<endl;
            pthread_cond_broadcast(&dataNotProduced); 
         }
         else if (item == MAX) {
            pthread_mutex_unlock(&rallow);
            continue;
        }
        pthread_mutex_unlock(&rallow);
    }
}

COnsumer 1:

void *reader1(void*)

{
    while(1)
    {
         pthread_mutex_lock(&mread);

         rc++;

         if(rc==1)
            pthread_mutex_lock(&rallow);

         pthread_mutex_unlock(&mread);


         if (Q.size() > 0) {
            // Get the data from the front of queue
            int data = Q.front();
      
            // Pop the consumed data from queue
            Q.pop();

            item--;
            cout << "B thread consumed: " << data <<endl;


            pthread_cond_signal(&dataNotConsumed);
        }
        else
        {
            cout << "B is in wait.." << endl;
            pthread_cond_wait(&dataNotProduced, &rallow);
            cout<<"B woke up"<<endl;
        }

         pthread_mutex_lock(&mread);

         rc--;

         if(rc==0)
            pthread_mutex_unlock(&rallow);

         pthread_mutex_unlock(&mread);
        sleep(1);

    } 
}

Consumer 2:


void *reader2(void*)

{
    while(1)
    {
         pthread_mutex_lock(&mread);

         rc++;

         if(rc==1)
            pthread_mutex_lock(&rallow);

         pthread_mutex_unlock(&mread);


         if (Q.size() > 0) {
            // Get the data from the front of queue
            int data = Q.front();
  
            // Pop the consumed data from queue
            Q.pop();

            item--;
            cout << "C thread consumed: " << data <<endl;
            pthread_cond_signal(&dataNotConsumed);
        }
        else
        {
            cout << "C is in wait.." << endl;
            pthread_cond_wait(&dataNotProduced, &rallow);
            cout<<"C woke up"<<endl;
        }

         pthread_mutex_lock(&mread);

         rc--;

         if(rc==0)
            pthread_mutex_unlock(&rallow);

         pthread_mutex_unlock(&mread);
        sleep(1);

    }
}

The output looks something like this:

C is in wait..
B is in wait..
Produced: 8 item: 1
Produced: 4 item: 2
Produced: 2 item: 3
Produced: 4 item: 4
Produced: 2 item: 5
Produced: 8 item: 6
Produced: 5 item: 7
Produced: 2 item: 8
Produced: 10 item: 9
Produced: 3 item: 10
>> Producer is in wait..
B woke up
B thread consumed: 8
B thread consumed: 4
B thread consumed: 2
B thread consumed: 4
B thread consumed: 2
B thread consumed: 8
B thread consumed: 5
B thread consumed: 2
B thread consumed: 10
B thread consumed: 3
B is in wait..
C woke up
C is in wait..
Producer woke up

My doubt is why threads B and C not showing parallel execution. And why does producer fill values into the buffer 10 at a time, rather than giving few, and then consumers consuming it, then again producing few. ANy leads would be highly appreciated.

2
Consider that the operating system gives slices of time to each thread. The producer could put multiple items in the queue before one of the consumers gets a time slice. Similarly, a single consumer might dequeue multiple items before the other consumer gets a chance. Even with each thread running on a separate core, there is no guarantee they all run exactly at the same pace.G. Sliepen
btw if you have two functions that are almost identical, consider to factor out the common part into one function that both can call. Also, which c++ version are you compiling against? Since c++11 there is std::thread which doesnt require to get in touch with those scary void*s463035818_is_not_a_number
@G.Sliepen I understand what you have said. But then looking at the output program is giving me, I see that once a thread locks to the mutex, it gets hold of the mutex for next few times as well. The current threads gets hold of the mutex significant no of times before it passes on to any other thread. How can I overcome this?Shreyas

2 Answers

1
votes
     else if (item == MAX) {
        pthread_mutex_unlock(&rallow);
        cout << ">> Producer is in wait.." << endl;
        pthread_cond_wait(&dataNotConsumed, &rallow);

You unlock the mutex and then wait. You can't do that. That creates a window during which the thing you are waiting for can occur before you wait. You must call pthread_cond_wait while holding the mutex to ensure that the thing you are waiting for doesn't happen after you've decided to wait but before you've started waiting.

You have another huge bug in your consumer. One thread can lock rallow and then another thread can try to unlock it. That's not allowed -- the thread that acquires the mutex must be the one to release it. You don't need two mutexes -- just use one that protects all state.

0
votes

First of all, there is no guarantee that all threads will run concurrently all the time. If they run on a single core, the operating system will give time slices of tens of milliseconds to each thread. And if they are running on different cores, then there is a latency between one thread calling pthread_cond_broadcast() and another thread waking up from a pthread_cond_wait(). This easily explains the writer thread being able to push 10 items to the queue before another thread wakes up.

The next issue is, why does B consume all the items, and C gets nothing? The problem is because of this:

pthread_mutex_lock(&mread);

rc++;

if(rc == 1)
    pthread_mutex_lock(&rallow);

pthread_mutex_unlock(&mread);

Consider threads B and C each executing this block right after each other. Both will be able to lock mread, both will increment rc, but only one will have locked rallow. What happens next is undefined, because they both try to access the queue, even though one of them will not be holding the lock.

There should be no need to have two mutexes. Both consumer threads should just lock rallow unconditionally, check if there is something in the queue, and if not call pthread_cond_wait().

Since you are using C++, you should really use C++11's thread support instead of using the C pthread functions. Your code should then look like:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex rallow;
std::condition_variable dataProduced;
std::condition_variable dataConsumed;

void writer() {
    while(true) {
        // Generate the random number
        int num = rand() % 10 + 1;
        std::cout << "Produced: " << num << "\n";

        // Push it to the queue
        {
            std::lock_guard<std::mutex> lock(rallow);
            dataConsumed.wait(rallow, [](){return Q.size() < MAX;});
            Q.push(num);
        }
    }
}

void reader(int id) {
    while(true) {
        int data;

        // Pop an item from the queue
        {
            std::lock_guard<std::mutex> lock(rallow);
            dataProduced.wait(rallow, [](){return Q.size() > 0;});
            data = Q.front();
            Q.pop();
        }

        // Process the data
        std::cout << "Consumer thread " << id << " consumed: " << data << "\n";
    }
}

You could even create a thread-safe queue class that handles the mutexes and condition variables itself, so the producer and consumer code would reduce to:

void writer() {
    while(true) {
        int num = rand() % 10 + 1;
        std::cout << "Produced: " << num << "\n";
        Q.push(num);
    }
}

void reader(int id) {
    while(true) {
        int data = Q.pop();
        std::cout << "Consumer thread " << id << " consumed: " << data << "\n";
    }
}