I'm writing a naive implementation of a single producer multiple consumer buffer with pthreads and condition variables, using C++ list as a buffer. I'm not too much worried about how fast my code runs, I just want to get rid of the errors.
The producer thread reads a string from a file and insert it into the end of a buffer while each one of the consumers reads from the beginning and put it on a different matrix. So, essencialy, I have a FIFO queue which has a max size and the first element can only be erased when all the consumers have already read it.
Here's the important part of the three functions of my code:
PRODUCER:
void *feedBuffer(void *threadproducer){
//some declarations...
while(!file->eof())
{
pthread_mutex_lock(&mutex);
while(*buffer_current_size == buffer_max_size) { // full
// wait until some elements are consumed
pthread_cond_wait(&can_produce, &mutex);
}
pthread_mutex_lock(&lock_buffer);
*file >> temp.word;
buffer->push_back(temp);
(*buffer_current_size)++;
pthread_mutex_unlock(&lock_buffer);
pthread_cond_broadcast(&can_consume);
pthread_mutex_unlock(&mutex);
}
file->close();
pthread_cond_broadcast(&can_consume);
pthread_mutex_lock(&lock_buffer);
buffer_current_size->store(-1); //END OF READ SIGNAL
pthread_mutex_unlock(&lock_buffer);
pthread_exit(NULL);
}
BUFFER CONTROLLER AND WORKER THREADS CALLER:
void *main_consumer(void *threadconsumer) //consumer caller and buffer controll
{
//some declarations...
for(int j=0; j<NUMTHREADS; j++)
{
pthread_create(&threads[j],&attr,worker,(void *) &workerargs[j]);
}
//BUFFER CONTROLLER
pthread_mutex_lock(&lock_buffer);
while(*buffer_current_size!=-1){ //WHILE READ HASN'T ENDED
pthread_mutex_unlock(&lock_buffer); //UNLOCK AND LOCK AGAIN TO LET OTHER THREADS HOLD THE LOCK FOR A WHILE
pthread_mutex_lock(&lock_buffer);
it=buffer->begin(); //GET FIRST ELEMENT OF THE BUFFER
if(it->cnt == NUMTHREADS){
buffer->pop_front(); //DELETE FIRST ELEMENT
(*buffer_current_size)--; //DECREASE SIZE
pthread_cond_signal(&can_produce); //PRODUCER CAN PRODUCE
}
}
pthread_mutex_unlock(&lock_buffer);
for(int i=0; i<NUMTHREADS; i++)
{
pthread_join(threads[i],NULL);
}
}
WORKER:
void *worker(void *threadwoker)
{
//some declarations...
pthread_mutex_lock(&lock_buffer); //LOCK TO GET BEGIN
it=buffer->begin();
while(!(*buffer_current_size==-1 && it==args->buffer->end())) {
pthread_mutex_unlock(&lock_buffer);
//insert into matrix...
pthread_mutex_lock(&lock_buffer); //UNIFIED LOCK FOR IT AND CNT, SOLVING ISSUE
(it->cnt)++;
it++;
pthread_mutex_unlock(&lock_buffer);
pthread_mutex_lock(&mutex);
while (*buffer_current_size==0) { //WAIT IF BUFFER EMPTY
pthread_cond_wait(&can_consume, &mutex);
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&lock_buffer); //LOCKING FOR WHILE ARGUMENTS
}
pthread_mutex_unlock(&lock_buffer);
pthread_exit(NULL);
}
As you can see, I used an int counter on each element of the buffer to check if all the worker threads have already read it. When this condition becomes true, the buffer controller erases the first element from the queue. All bounded by locks, to guarantee data integrity.
The problem is, this code doesn't work, I either get a seg fault or mutex error. Can anyone enlighten with any ideas why?