2
votes

I am implementing a consumer-producer problem with using pthreads in ubuntu 16.04 on a virtual machine with 2 processers. We have n number of producers and a single consumer that reads data from each producer through buffers. Each producer has its own buffer. The problem is that at some point my code deadlocks and could not find out why. May be i am messing up with the condition variables and mutex locks.

typedef struct
{
    Student*        buf;         // the buffer
    int             producer_id;
    size_t          len;         // number of items in the buffer
    pthread_mutex_t mutex;       // needed to add/remove data from the buffer
    pthread_cond_t  can_produce; // signaled when items are removed
    pthread_cond_t  can_consume; // signaled when items are added
    bool            isDone;
} buffer_t;

This the buffer struct for the data transfer between consumer and producers.

void* producer(void *param)
{
    buffer_t* buffer = (buffer_t*)param;
    char*     line   = NULL;
    size_t    len    = 0;
    ssize_t   read;
    FILE*     inFileReader;

    inFileReader = fopen(inFile, "r");
    while ((read = getline(&line, &len, inFileReader)) != -1)
    {
        pthread_mutex_lock(&buffer->mutex);
        if (buffer->len == BUFFER_SIZE)
        {// buffer full wait
            pthread_cond_wait(&buffer->can_produce, &buffer->mutex);
        }

        //buffer filled up with the read data from the file.
        if (conditionValid)
        {
            //add item to buffer
            ++buffer->len;
        }

        pthread_cond_signal(&buffer->can_consume);
        pthread_mutex_unlock(&buffer->mutex);
    }

    fclose(inFileReader);
    buffer->isDone = true;
    pthread_exit(NULL);

    return NULL;
}

This is the producer, reads data from a file given and fills up the buffer and when the file ends the producer thread will exit.

void* consumer(void *param)
{
    buffer_t* buffer_array = (buffer_t*)param;
    sleep(rand() % 3);
    int i =0;

    while (!buffer_array[i%N].isDone || buffer_array[i%N].len != 0)
    {
        pthread_mutex_lock(&buffer_array[i%N].mutex);

        if (buffer_array[i%N].len == 0)
        {
            pthread_cond_wait(&buffer_array[i%N].can_consume, 
                              &buffer_array[i%N].mutex);
        }

        while (buffer_array[i%N].len != 0)
        {
            buffer_array[i%N].len--;
            //read from buffer
        }

        pthread_cond_signal(&buffer_array[i%N].can_produce);
        pthread_mutex_unlock(&buffer_array[i%N].mutex);
        i++;
    }

    fclose(outFileWriter);
    pthread_exit(NULL);

    return NULL;
} 

The consumer reads buffers from each producer and when all done consumer exit the loop and terminates the thread.

Any help appreciated.

1
It would appear you at least need some inner while loops for your predicate checks. Just because a condition variable fired doesn't mean the predicate it is wrapping is necessarily true upon returning from pthread_cond_wait. At the risk of souinding grossly self-serving, see this answer for some advice, caveats, and potential pitfalls of pthreads with mutex+cvar usage.WhozCraig
Yeah, never if (!cond) { pthread_cond_wait }. Always while (!cond) { pthread_cond_wait }.ikegami
Instead of doing i%N all over the place, you can replace i++; and i%N with with i = (i+1) % N;` and i.ikegami
changing to while does not helped at all. Somehow i observed that even though producer thread exists it does not signal the consumer so the consumer waits infinitely.Dogukan Altay
Could you please delete all line that don't matter so that we can have a easier look.meaning-matters

1 Answers

1
votes

The deadlock is because producer finishes reading with just one flag set:

buffer->isDone = true;

So, if at that time consumer is waiting for producer 1 condition:

    // say i == 1
    if (buffer_array[i%N].len == 0)
    {
        pthread_cond_wait(&buffer_array[i%N].can_consume, 
                          &buffer_array[i%N].mutex);
    }

The condition will never fire. The consumer has no progress, so everything is just stalls.

The solution might be to fire the can_consume after isDone is set.