1
votes

Below is the code which i am trying for producer consumer problem using posix threads. In my code I am using condition variable to wait and signal mutex. My code allows me to configure number of producer and consumer threads.

But when consumer thread count is more than producer thread count, my program hangs and do not finish.

I think, producer thread function has completed before consumer thread functions by putting max values in queue. And waiting consumer threads are not receiving signal. Although before completion producer thread has generated signal for each value it added to the queue.

Need help to identify issue in code, so that it runs for any combination of producer and consumer thread count.

#include<iostream>
#include<queue>
#include<pthread.h>

using namespace std;

pthread_cond_t c=PTHREAD_COND_INITIALIZER;
pthread_mutex_t m=PTHREAD_MUTEX_INITIALIZER;

long max_item=50000;
long produced=0;
long consumed=0;

const long producer_count=5;
const long consumer_count=15;

queue<long>q;

void * producer(void *)
{
    long i=1;
    while(true)
    {
        pthread_mutex_lock(&m);
        if(produced==max_item)
        {pthread_mutex_unlock(&m);break;} 

        q.push(i++);++produced;
        pthread_cond_signal(&c);
        pthread_mutex_unlock(&m);
    }
    pthread_mutex_lock(&m);
    cout<<"Producer produced:"<<i-1<<endl;
    pthread_mutex_unlock(&m);

}
void *consumer(void *)
{
    long i=1;
    long val=0;
    while(true)
    {
        pthread_mutex_lock(&m);
        if(consumed==max_item){pthread_mutex_unlock(&m);break;}    
        while(q.empty())
           {pthread_cond_wait(&c,&m);}
        val=q.front();q.pop();
        ++i,consumed++;
        pthread_mutex_unlock(&m);
    }
    pthread_mutex_lock(&m);
    cout<<"Consumer consumed:"<<i-1<<endl;
    pthread_mutex_unlock(&m);
}
int main()
{
   pthread_t p[producer_count],c[consumer_count];

   for(int i=0;i<producer_count;++i)
    pthread_create(&p[i],NULL,producer,NULL);

   for(int i=0;i<consumer_count;++i)
    pthread_create(&c[i],NULL,consumer,NULL);

   for(int i=0;i<producer_count;++i)
    pthread_join(p[i],NULL);

   for(int i=0;i<consumer_count;++i)
    pthread_join(c[i],NULL);

   cout<<"Total produced:"<<produced<<endl;
   cout<<"Total consumed:"<<consumed<<endl;
   cout<<"Queue size at end:"<<q.size()<<endl;

   pthread_mutex_destroy(&m);
    return 0;
}


Output:
$ ./a.out
Producer produced:12740
Consumer consumed:3351
Producer produced:16948
Consumer consumed:2512
Consumer consumed:3383
Producer produced:4892
Producer produced:7417
Consumer consumed:5374
Consumer consumed:4550
Producer produced:8003
Consumer consumed:5229
Consumer consumed:2023
Consumer consumed:1366
Consumer consumed:3346
Consumer consumed:2231

--Program hangs here-----


Output from different run when both producer and consumer thread count are set to 15.
$ ./a.out
Producer produced:1276
Producer produced:2162
Producer produced:1401
Producer produced:505
Producer produced:455
Producer produced:1900
Producer produced:2522
Producer produced:901
Producer produced:308
Producer produced:1461
Producer produced:755
Producer produced:102
Producer produced:21332
Consumer consumed:514
Consumer consumed:1335
Consumer consumed:4219
Consumer consumed:1422
Consumer consumed:644
Consumer consumed:231
Producer produced:14191
Consumer consumed:621
Consumer consumed:541
Consumer consumed:1505
Consumer consumed:32234
Consumer consumed:1985
Producer produced:729
Consumer consumed:2723
Consumer consumed:1012
Consumer consumed:69
Consumer consumed:945
Total produced:50000
Total consumed:50000
Queue size at end:0

---Program completes here after showing produced and consumed count are equal and queue is fully consumed.
1

1 Answers

1
votes

The consumer loops until it gets all the produced items, but with more consumers, the items get divided, so you need another check to see when the consumer has finished. The producer could set a finished boolean and signal that with the same condition variable with pthread_cond_broadcast, so the consumers should check that within the locked mutex before waiting on the empty queue.