0
votes

Following code is only to show how to use condition variable to synchronize threads(one producer and many consumers) as exercising. See the line for code 'usleep(100);'.When I comment this line,two consumer threads seem got the mutex locked at the same time after the producer thread exit,and then wait on 'cond',this raise race condition. But if I uncomment it.everything goes well(seems).

My question is how can two consumer threads lock one mutex at the same time?This demo should also work even after I don't call usleep()?Thanks for your time in advance.

This is the output after usleep removed from loops in producer.Please note the last two 'lock' in the output.

$ ./pthread_cond 
Producer 3062414192 beginning...
Producer locked mutex self:3062414192
Producer is creating work:1
Producer finished creating work:1
Producer unlock self:3062414192
Producer locked mutex self:3062414192
Producer is creating work:2
Producer finished creating work:2
Producer unlock self:3062414192
Producer locked mutex self:3062414192
Producer is creating work:3
Producer finished creating work:3
Producer unlock self:3062414192
Producer locked mutex self:3062414192
Producer is creating work:4
Producer finished creating work:4
Producer unlock self:3062414192
Producer 3062414192 exit after creating 4 works...
produce joined,but 4 work remained
Consumer 3070806896 beginning...
Consumer locked mutex self:3070806896
to wait on cond self:3070806896
Consumer 3079199600 beginning...
Consumer locked mutex self:3079199600
to wait on cond self:3079199600

Implemented:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

#define MAX_COUSUMER 2

#define TOTAL_WORK 4

int g_work_counter=0;

pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

void *producer_thread(void* arg)
{
    int i;

    printf("Producer %lu beginning...\n",pthread_self());
    for(i=0;i<TOTAL_WORK;i++)
    {
        assert(pthread_mutex_lock(&mut)==0);
        printf("Producer locked mutex self:%lu\n",pthread_self());
        printf("Producer is creating work:%d\n",g_work_counter+1);
        g_work_counter++;
        printf("Producer finished creating work:%d\n",g_work_counter);
        pthread_cond_broadcast(&cond);
        assert(pthread_mutex_unlock(&mut)==0);
        printf("Producer unlock self:%lu\n",pthread_self());

        //usleep(100);
    }

    printf("Producer self:%lu exit after creating %d works...\n",pthread_self(),i);//counter starts from 0
    pthread_exit(NULL);
}

void *consumer_thread(void *arg)
{
    printf("Consumer %lu beginning...\n",pthread_self());
    //use pthread_cancel in main
    pthread_detach(pthread_self());

    while(1)
    {
        assert(pthread_mutex_lock(&mut)==0);
        printf("Consumer locked mutex self:%lu\n",pthread_self());
        printf("to wait on cond self:%lu\n",pthread_self());
        assert(pthread_cond_wait(&cond,&mut)==0);
        if(g_work_counter)
        {
            printf("Consumer %lu is performing work:%d\n",pthread_self(),g_work_counter);
            g_work_counter--;
            printf("Consumer %lu finished performing work:%d\n",pthread_self(),g_work_counter+1);
        }
        assert(pthread_mutex_unlock(&mut)==0);
        printf("Consumer unlock self:%lu\n",pthread_self());
    }

    //no output (pthread_cancel is called)
    printf("Consumer %lu exit...\n",pthread_self());
    pthread_exit(NULL);
}

int main(int argc,char* argv[])
{
    pthread_t producer;
    pthread_t consumers[MAX_COUSUMER];
    int i;

    for(i=0;i<MAX_COUSUMER;i++)
    {
        if(pthread_create(&consumers[i],NULL,consumer_thread,NULL)!=0)
        {
            printf("pthread_create failed for consumer_thread %d\n",i);
        }
    }

    pthread_create(&producer,NULL,producer_thread,NULL);

    if(pthread_join(producer,NULL)!=0)
    {
        printf("pthread_join failed for producer_thread %lu\n",consumers[i]);
    }
    printf("producer joined,but %d work remained\n",g_work_counter);

    //wait for the consumers
    while(g_work_counter>0)
        ;

    //cancel the consumer,for they are detached
    for(i=0;i<MAX_COUSUMER;i++)
    {
        if(pthread_cancel(consumers[i])!=0)
        {
            printf("pthread_cancel failed for consumer_thread %d\n",i);
        }
    }

    pthread_mutex_destroy(&mut);
    pthread_cond_destroy(&cond);
    return 0;
}
3
I like the indentation of your code!Praveen S

3 Answers

3
votes

When a thread waits on a condition, it releases the lock. When it is woken up, it reacquires it. In this code, a consumer should only wait if the buffer is empty.

Another problem lies in main, actually, with this line: while(g_work_counter>0). You don't have the lock at that point, so it's not safe to check g_work_counter. I'm also not too sure about pthread_detach(pthread_self());. Shouldn't that be called by main on its own child?

As a general note, if you want to check for deadlocks and otherwise debug your pthreads code, you should use the pthread_mutexattr_foo functions to set up an error-checking mutex, and check the return value with more than just an assert(==0).

3
votes

As Borealid says, the lock is released while the thread is waiting on the condition variable. Your consumer function should look like this:

    /* Wait for some work to be available */
    pthread_mutex_lock(&mut);
    while (g_work_counter == 0)
    {
        pthread_cond_wait(&cond, &mut);
    }

    /* Have lock, g_work_counter is > 0, so do work */
    while (g_work_counter > 0)
    {
        g_work_counter--;
    }

    pthread_mutex_unlock(&mut);

(pthread_cond_wait() should always be used inside a spinning while loop like that).

1
votes

I would propose that you stay clear of pthread_cancel() API. Its very hard to use pthread_cancel() without introducing resource leaks, deadlocks and inconsistencies in your program.

As a rule of thump: Use detached threads for "one-shot/i don't care about the results/the completion of something" kind of things. For other things use "normal threads" which shut down in a cooperative way (by checking a flag).

So in your example you want all the work to be consumed before exiting the main program. Don't create the consumers as detached and replace the loop which canceled all the consumers with a loop doing a pthread_join() for all consumers.