0
votes

I'm writing a naive implementation of a single producer multiple consumer buffer with pthreads and condition variables, using C++ list as a buffer. I'm not too much worried about how fast my code runs, I just want to get rid of the errors.

The producer thread reads a string from a file and insert it into the end of a buffer while each one of the consumers reads from the beginning and put it on a different matrix. So, essencialy, I have a FIFO queue which has a max size and the first element can only be erased when all the consumers have already read it.

Here's the important part of the three functions of my code:

PRODUCER:

void *feedBuffer(void *threadproducer){
//some declarations...
while(!file->eof())
{   
    pthread_mutex_lock(&mutex);

    while(*buffer_current_size == buffer_max_size) { // full
        // wait until some elements are consumed
        pthread_cond_wait(&can_produce, &mutex);
    }

    pthread_mutex_lock(&lock_buffer); 
        *file >> temp.word;
        buffer->push_back(temp);
        (*buffer_current_size)++;
    pthread_mutex_unlock(&lock_buffer);

    pthread_cond_broadcast(&can_consume);
    pthread_mutex_unlock(&mutex);
}
file->close();
pthread_cond_broadcast(&can_consume);

pthread_mutex_lock(&lock_buffer);
    buffer_current_size->store(-1); //END OF READ SIGNAL
pthread_mutex_unlock(&lock_buffer);
pthread_exit(NULL);
}

BUFFER CONTROLLER AND WORKER THREADS CALLER:

void *main_consumer(void *threadconsumer) //consumer caller and buffer controll
{  
//some declarations...
for(int j=0; j<NUMTHREADS; j++)
{  
    pthread_create(&threads[j],&attr,worker,(void *) &workerargs[j]);
}

//BUFFER CONTROLLER
pthread_mutex_lock(&lock_buffer);

while(*buffer_current_size!=-1){ //WHILE READ HASN'T ENDED

    pthread_mutex_unlock(&lock_buffer); //UNLOCK AND LOCK AGAIN TO LET OTHER THREADS HOLD THE LOCK FOR A WHILE
    pthread_mutex_lock(&lock_buffer);

    it=buffer->begin(); //GET FIRST ELEMENT OF THE BUFFER
    if(it->cnt == NUMTHREADS){
        buffer->pop_front(); //DELETE FIRST ELEMENT
        (*buffer_current_size)--; //DECREASE SIZE

        pthread_cond_signal(&can_produce); //PRODUCER CAN PRODUCE
    }
}
pthread_mutex_unlock(&lock_buffer);
for(int i=0; i<NUMTHREADS; i++)
{
    pthread_join(threads[i],NULL);
}
}

WORKER:

void *worker(void *threadwoker)
{
//some declarations...
pthread_mutex_lock(&lock_buffer); //LOCK TO GET BEGIN
it=buffer->begin();
while(!(*buffer_current_size==-1 && it==args->buffer->end())) {
    pthread_mutex_unlock(&lock_buffer);
    //insert into matrix...

    pthread_mutex_lock(&lock_buffer); //UNIFIED LOCK FOR IT AND CNT, SOLVING ISSUE
        (it->cnt)++;
        it++;
    pthread_mutex_unlock(&lock_buffer);

    pthread_mutex_lock(&mutex);
    while (*buffer_current_size==0) { //WAIT IF BUFFER EMPTY

        pthread_cond_wait(&can_consume, &mutex);
    }
    pthread_mutex_unlock(&mutex);


    pthread_mutex_lock(&lock_buffer); //LOCKING FOR WHILE ARGUMENTS
}
pthread_mutex_unlock(&lock_buffer);
pthread_exit(NULL);
}

As you can see, I used an int counter on each element of the buffer to check if all the worker threads have already read it. When this condition becomes true, the buffer controller erases the first element from the queue. All bounded by locks, to guarantee data integrity.

The problem is, this code doesn't work, I either get a seg fault or mutex error. Can anyone enlighten with any ideas why?

1
Try with a simpler version without the limit on buffer size (get rit of can_produce). Once that works you can add that sectionmarom
Without the can_produce condition variable it works just fine on a test database. But, since I'm dealing with big amounts of data (3.5gb plain text files), I can't have an unlimited buffer.Victor Jorge

1 Answers

1
votes

Firstly, it is not clear exactly what data structures are being protected by each mutex. I suggest that for the initial implementation at least, you simplify down to one mutex protecting all of the shared state - that is the buffer size counter, the buffer itself, and the counter in the work items.

As for specific issues:

  • the Producer should re-test the condition after pthread_cond_wait() (it should be a while () loop rather than an if () statement);
  • when the Producer finishes, it accesses *buffer_current_size without a lock held and doesn't signal any waiting consumers;
  • the Buffer Controller accesses *buffer_current_size without a lock held;
  • the Worker accesses buffer->begin() and buffer->end() without a lock held;
  • the Worker accesses *buffer_current_size without a lock held;
  • the Worker calls pthread_mutex_trylock(&mutex) without checking the result, which means it could access shared state and unlock the mutex without having it locked;
  • the Worker needs to re-check the condition it's waiting for after calling pthread_cond_wait();
  • the Worker accesses the iterator it without a lock held, which is problematic because of other threads modifying the underlying ::list- since this thread has already incremented the counter, the Buffer Controller could have already deleted the item that the iterator points to, which means you can't increment the iterator.