1
votes

I have 10 consumer and 1 producer threads. Producer thread produce a random integer and insert it into buffer. Consumer threads take and remove an item from this buffer and print it.

Everything is works good(I am in, there is no endless loop or blocking stuffs). But I think, there is only one working consumer thread which is 10th(last) consumer thread. The other 9 consumer threads doesn't work. I realize that when I print the consumer id in consumer thread method. Why the other 9 consumer threads doesn't work and what can do for this kind of problem?

Below is my code:

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
typedef int buffer_item;
#define BUFFER_SIZE 5

#define RAND_DIVISOR 100000000
#define TRUE 1

pthread_mutex_t mutex;

sem_t full, empty;

buffer_item buffer[BUFFER_SIZE];

int counter;

pthread_t tid;      
pthread_attr_t attr; 

void *producer(void *param); 
void *consumer(void *param); 

int insert_item(buffer_item item) {

   if(counter < BUFFER_SIZE) {
      buffer[counter] = item;
      counter++;
      return 0;
   }
   else { 
      return -1;
   }
}

int remove_item(buffer_item *item) {
   if(counter > 0) {
      *item = buffer[(counter-1)];
      counter--;
      return 0;
   }
   else {
      return -1;
   }
}

void initializeData() {

   pthread_mutex_init(&mutex, NULL);

   sem_init(&full, 0, 0);

   sem_init(&empty, 0, BUFFER_SIZE);

   pthread_attr_init(&attr);

   counter = 0;
}

void *producer(void *param) {
   buffer_item item;

   while(TRUE) {
      int rNum = rand() / RAND_DIVISOR;
      sleep(1);

      item = rand()%100;

      sem_wait(&empty);
      pthread_mutex_lock(&mutex);

      if(insert_item(item)) {
         fprintf(stderr, " Producer report error condition\n");
      }
      else {
         printf("producer produced %d\n", item);
      }
      pthread_mutex_unlock(&mutex);
      sem_post(&full);
   }
}

void *consumer(void *param) {
   buffer_item item;
   int* consumerID=(int*)param;

   while(TRUE) {
      int rNum = rand() / RAND_DIVISOR;
      sleep(1);

      sem_wait(&full);
      pthread_mutex_lock(&mutex);
      if(remove_item(&item)) {
         fprintf(stderr, "Consumer report error condition\n");
      }
      else {
         printf("consumer %d consumed %d\n" ,*consumerID, item);

      }
      pthread_mutex_unlock(&mutex);
      sem_post(&empty);
   }
}

int main(int argc, char *argv[]) {
   /* Loop counter */
   int i;


   int numProd = 1; /* Number of producer threads */
   int numCons = 10; /* Number of consumer threads */

   /* Initialize the app */
   initializeData();

   /* Create the producer threads */
   for(i = 0; i < numProd; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,producer,NULL);
    }

   /* Create the consumer threads */
   for(i = 0; i < numCons; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,consumer,(void*)&i);
   }

   /* Sleep for the specified amount of time in milliseconds */
   sleep(10);

   /* Exit the program */
   printf("Exit the program\n");
   exit(0);
}

My output is :

producer produced 27
consumer 10 consumed 27
producer produced 63
consumer 10 consumed 63
producer produced 26
consumer 10 consumed 26
producer produced 11
consumer 10 consumed 11
producer produced 29
consumer 10 consumed 29
producer produced 62
consumer 10 consumed 62
producer produced 35
consumer 10 consumed 35
producer produced 22
consumer 10 consumed 22
producer produced 67
consumer 10 consumed 67
Exit the program
3
You're really using way to much comments. None of the comments adds any value. Remember that "well commented code" does not imply anything about the amount of comments. - klutt
What's the purpose of sem_wait(&full)%100;? The %100 has no effect. You should try compiling with -Wall if not already - Shawn
@Shawn Sorry,I edited. - javac
@Broman I edited the code, sorry for that. - javac
It's for your own sake too. It takes more place on the screen, takes longer to write, and makes the code harder to maintain. - klutt

3 Answers

3
votes

Shawn has beat me to it, but yes he is correct. See the following implementation:

/* Consumer Thread */
void *consumer(void *param) {
    buffer_item item;
    int* consumerID=(int*)param;

    printf("consumer %d created\n" ,*consumerID);

    while(TRUE) {
        /* sleep for a random period of time */
        int rNum = rand() / RAND_DIVISOR;
        sleep(1);

        /* aquire the full lock */
        sem_wait(&full)%100;
        /* aquire the mutex lock */
        pthread_mutex_lock(&mutex);
        if(remove_item(&item)) {
            //fprintf(stderr, "Consumer report error condition: consumer %d item: %d\n", *consumerID, item);
        }
        else {
            printf("consumer %d consumed %d\n" ,*consumerID, item);

        }
        /* release the mutex lock */
        pthread_mutex_unlock(&mutex);
        /* signal empty */
        sem_post(&empty);
    }
}

long taskids[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

    /* Create the consumer threads */
    for(i = 0; i < numCons; i++) {
        taskids[i] = i;
        /* Create the thread */
        pthread_create(&tid,&attr,consumer, taskids + i);
    }

Which results in:

consumer 0 created
consumer 1 created
consumer 2 created
consumer 5 created
consumer 3 created
consumer 6 created
consumer 9 created
consumer 7 created
consumer 8 created
consumer 4 created
producer produced 65
consumer 9 consumed 65
producer produced 57
consumer 6 consumed 57
producer produced 33
consumer 5 consumed 33
producer produced 57
consumer 1 consumed 57
producer produced 3
consumer 3 consumed 3
producer produced 81
consumer 9 consumed 81
producer produced 1
consumer 5 consumed 1
1
votes

In:

for(i = 0; i < numCons; i++) {
/* Create the thread */
pthread_create(&tid,&attr,consumer,(void*)&i);

You're passing each thread a pointer to i and keeping a pointer to it, which will follow the value of i

You need to copy to a new variable in the beginning of the thread function, but it's important to know that the thread function will not start immediately when pthread_create is called. It's likely that they won't start until the loop is finished and i = 10. So what's likely happening is that you do in fact have 10 consumers, but they all have the same number.

If you want to use i as the ID, you should wait on a semaphore from the target thread (which would be posted after assigning from *param) before creating a new thread.

If you want a very simple test for this, you can add a sleep(1) call after calling each pthread_create. That should give each thread time to start and correctly assign an ID.

0
votes
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class consumeprodue<BlockingQue> {
    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
    public static void main(String[] args) {
        Thread p = new Thread() {
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println(getName() + " produced :" + i);
                        queue.put(i);
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }

        };
        Thread c = new Thread() {

            public void run() {

                try {
                    while (true) {
                        System.out.println(getName() + "consume :" + queue.take());
                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();
                }
            }

        };

        p.start();
        c.start();

    }

}