0
votes

I'm working on a project that solves the classic problem of producer / consumer scheduling. Linux Open Suse 42.3 Leep, API System V, C language

The project consists of three programs: producer, consumer and scheduler. The purpose of schedulers is to create 3 semaphores, shared memory in which there is a buffer (array) in which write (producer) and read (consumer) and to run n producer and m consumer processes.

Each producer must perform k write cycles to the buffer, and the consumer must perform k read cycles.

3 semaphores were used: mutex, empty and full. The value of the full semaphore is used in the program as an index in the array.

The problem is that: for example, when the buffer size is 3, producers write 4 portions of data, when the buffer size is 4 - 5 portions of data (although there should be 4) ...

Consumers read normally.

In addition, the program does not behave predictably when calling get_semVal fucntion.

Please help, I will be very, very grateful for the answer.

screenshot of results (2 producer processes, 2 consumer processes, buffer size 3, each processes must perform 3 iteration):

producer

#define BUFFER_SIZE 3
#define MY_RAND_MAX 99      // Highest integer for random number generator
#define LOOP 3              //the number of write / read cycles for each process
#define DATA_DIMENSION 4    // size of portion of data for 1 iteration 
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;

buffer_item buffer[BUFFER_SIZE];

void P(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = -1;
op.sem_flg = 0;
semop(semid,&op,1);
}

void V(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = +1;
op.sem_flg = 0;
semop(semid,&op,1);
}

void Init(int semid,int index,int value)
{
    semctl(semid,index,SETVAL,value);
}

int get_semVal(int sem_id)
{
    int value = semctl(sem_id,0,GETVAL,0);
    return value;
}

int main()
{
    sem_mutex = semget(KEY_MUTEX,1,0);
    sem_empty = semget(KEY_EMPTY,1,0);
    sem_full = semget(KEY_FULL,1,0);

    srand(time(NULL));

const int SIZE = sizeof(buffer[BUFFER_SIZE]);

shm_id = shmget(KEY_SHARED_MEMORY,SIZE, 0);

int i=0;
buffer_item *adr;
do {
    buffer_item nextProduced;
   
    P(sem_empty);

    P(sem_mutex);

    //prepare portion of data
    for(int j=0;j<DATA_DIMENSION;j++)
    {
        nextProduced.buf[j]=rand()%5;
    }

    adr = (buffer_item*)shmat(shm_id,NULL,0);

    int full_value = get_semVal(sem_full);//get index of array
    printf("-----%d------\n",full_value-1);//it’s for test the index of array in buffer

   // write the generated portion of data by index full_value-1
    adr[full_value-1].buf[0] = nextProduced.buf[0];
    adr[full_value-1].buf[1] = nextProduced.buf[1];
    adr[full_value-1].buf[2] = nextProduced.buf[2];
    adr[full_value-1].buf[3] = nextProduced.buf[3];

    shmdt(adr);


    printf("producer %d produced %d %d %d %d\n", getpid(), nextProduced.buf[0],nextProduced.buf[1],nextProduced.buf[2],nextProduced.buf[3]);

    V(sem_mutex);
    V(sem_full);

    i++;
    } while (i<LOOP);

V(sem_empty); 
sleep(1); 
 }

consumer

 …
int main()
{

sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);

srand(time(NULL));

const int SIZE = sizeof(buffer[BUFFER_SIZE]);

shm_id = shmget(KEY_SHARED_MEMORY,SIZE,0);

int i=0;
buffer_item *adr;
do
{
    buffer_item nextConsumed;

    P(sem_full);
    P(sem_mutex);

    int full_value = get_semVal(sem_full);

    adr = (buffer_item*)shmat(shm_id,NULL,0);

    for(int i=0;i<BUFFER_SIZE;i++)
    {
        printf("--%d %d %d %d\n",adr[i].buf[0],adr[i].buf[1],adr[i].buf[2],adr[i].buf[3]);
    }

    for(int i=0;i<BUFFER_SIZE;i++)
    {
        buffer[i].buf[0] = adr[i].buf[0];
        buffer[i].buf[1] = adr[i].buf[1];
        buffer[i].buf[2] = adr[i].buf[2];
        buffer[i].buf[3] = adr[i].buf[3];
    }

    tab(nextConsumed);

        nextConsumed.buf[0]=buffer[full_value-1].buf[0];
        nextConsumed.buf[1]=buffer[full_value-1].buf[1];
        nextConsumed.buf[2]=buffer[full_value-1].buf[2];
        nextConsumed.buf[3]=buffer[full_value-1].buf[3];



    // Set buffer to 0 since we consumed that item
    for(int j=0;j<DATA_DIMENSION;j++)
    {
        buffer[full_value-1].buf[j]=0;
    }

    for(int i=0;i<BUFFER_SIZE;i++)
    {
        adr[i].buf[0]=buffer[i].buf[0];
        adr[i].buf[1]=buffer[i].buf[1];
        adr[i].buf[2]=buffer[i].buf[2];
        adr[i].buf[3]=buffer[i].buf[3];

    }

    shmdt(adr);

    printf("consumer %d consumed %d %d %d %d\n", getpid() ,nextConsumed.buf[0],nextConsumed.buf[1],nextConsumed.buf[2],nextConsumed.buf[3]);

    V(sem_mutex);
    // increase empty
    V(sem_empty);

    i++;
    } while (i<LOOP);

V(sem_full);
sleep(1);
}

Scheduler

…
struct Data {
   int buf[DATA_DIMENSION];
 };
typedef struct Data buffer_item;

buffer_item buffer[BUFFER_SIZE];

struct TProcList
{
  pid_t processPid;
};
typedef struct TProcList ProcList;
 …

ProcList createProcess(char *name)
{
   pid_t pid;
   ProcList a;

   pid = fork();
   if (!pid){
      kill(getpid(),SIGSTOP);
      execl(name,name,NULL);
      exit(0);
   }
   else if(pid){
       a.processPid=pid;
   }
   else
       cout<<"error forking"<<endl;
   return a;
   }

   int main()
   {
      sem_mutex = semget(KEY_MUTEX,1,IPC_CREAT|0600);
      sem_empty = semget(KEY_EMPTY,1,IPC_CREAT|0600);
      sem_full = semget(KEY_FULL,1,IPC_CREAT|0600);

Init(sem_mutex,0,1);//unlock mutex
Init(sem_empty,0,BUFFER_SIZE); 
Init(sem_full,0,0);//unlock empty

const int SIZE = sizeof(buffer[BUFFER_SIZE]);

shm_id = shmget(KEY_SHARED_MEMORY,SIZE,IPC_CREAT|0600);

buffer_item *adr;
adr = (buffer_item*)shmat(shm_id,NULL,0);

for(int i=0;i<BUFFER_SIZE;i++)
{
    buffer[i].buf[0]=0;
    buffer[i].buf[1]=0;
    buffer[i].buf[2]=0;
    buffer[i].buf[3]=0;
}

for(int i=0;i<BUFFER_SIZE;i++)
{
    adr[i].buf[0] = buffer[i].buf[0];
    adr[i].buf[1] = buffer[i].buf[1];
    adr[i].buf[2] = buffer[i].buf[2];
    adr[i].buf[3] = buffer[i].buf[3];

}

int consumerNumber = 2;
int produserNumber = 2;

ProcList producer_pids[produserNumber];
ProcList consumer_pids[consumerNumber];

for(int i=0;i<produserNumber;i++)
{
    producer_pids[i]=createProcess("/home/andrey/build-c-unknown-Debug/c");//create sleeping processes 
}

for(int i=0;i<consumerNumber;i++)
{
    consumer_pids[i]=createProcess("/home/andrey/build-p-unknown-Debug/p");
}

    sleep(3);

for(int i=0;i<produserNumber;i++)
{
   kill(producer_pids[i].processPid,SIGCONT);//continue processes
   sleep(1);
}
for(int i=0;i<consumerNumber;i++)
{
   kill(consumer_pids[i].processPid,SIGCONT);
   sleep(1);
}


for(int i=0;i<produserNumber;i++)
    {
        waitpid(producer_pids[i].processPid,&stat,WNOHANG);//wait
    }
for(int i=0;i<consumerNumber;i++)
    {
        waitpid(consumer_pids[i].processPid,&stat,WNOHANG);
    }

shmdt(adr);

semctl(sem_mutex,0,IPC_RMID);
semctl(sem_full,0,IPC_RMID);
semctl(sem_empty,0,IPC_RMID);

}
1
Why semaphores and not a mutex and some condition variables? Also, cout<< is C++, and not C. They are two different programming languages, and there are separate c and c++ tags for them for that reason. - None
I need to do this with semaphores and processes exactly. Concerning with с and с++ I agree. - Andrii
Does each consumer consume just one element from the buffer, or the entire buffer? Does each producer produce just one element to the buffer, or the entire buffer? In other words, what is the "job unit": one element, some elements, or the entire buffer? It matters a lot, when using semaphores, you see. (And do replace cout << "error forking" << endl; with the C printf("Cannot fork: %s.\n", strerror(errno)); that also describes the error. fork() error case is when pid == -1, 0 is only returned in the child, and all other values are returned in the parent as the child process ID.) - None
In my task, a portion of data is data for tabulation of a function (start, end, step and function ). The portion of data is presented as a struct Data. At the same time there are several portion of data can be in buffer (array of structures). By one cycle the producer write one portion of data (start, end, step and function) and consumer read also one portion (start, end, step and function). The screenshot shows (see ----- 1 ------) semaphore value full: 1 0 1 2. Buffer size 3. Question why an additional piece of data appears in the buffer. - Andrii
I use POSIX, including POSIX semaphores on Linux (and other POSIXy systems), and you're using System V ones; I'm afraid I can't help you with those. - None

1 Answers

1
votes

It is not fun to try and unravel uncommented code someone else has written, so instead, I'll explain a verified working scheme.

(Note that comments should always explain programmer intent or idea, and never what the code does; we can read the code to see what it does. The problem is, we need to first understand the programmer idea/intent first, before we can compare that to the implementation. Without comments, I would need to first read the code to try and guess at the intent, then compare that to the code itself; it's like double the work.)

(I suspect OP's underlying problem is trying to use semaphore values as buffer indexes, but didn't pore through all of the code to be 100% certain.)

Let's assume the shared memory structure is something like the following:

struct shared {
    sem_t       lock;            /* Initialized to value 1 */
    sem_t       more;            /* Initialized to 0 */
    sem_t       room;            /* Initialized to MAX_ITEMS */
    size_t      num_items;       /* Initialized to 0 */
    size_t      next_item;       /* Initialized to 0 */
    item_type   item[MAX_ITEMS];
};

and we have struct shared *mem pointing to the shared memory area.

Note that you should, at runtime, include <limits.h>, and verify that MAX_ITEMS <= SEM_VALUE_MAX. Otherwise MAX_ITEMS is too large, and this semaphore scheme may fail. (SEM_VALUE_MAX on Linux is usually INT_MAX, so big enough, but it may vary. And, if you use -O to optimize when compiling, the check will be optimized completely away. So it is a very cheap and reasonable check to have.)

The mem->lock semaphore is used like a mutex. That is, to lock the structure for exclusive access, a process waits on it. When it is done, it posts on it.

Note that while sem_post(&(mem->lock)) will always succeed (ignoring bugs like mem being NULL or pointing to uninitialized memory or having been overwritten with garbage), technically, sem_wait() can be interrupted by a signal delivery to an userspace handler installed without SA_RESTART flag. This is why I recommend using a static inline helper function instead of sem_wait():

static inline int  semaphore_wait(sem_t *const s)
{
    int  result;
    do {
        result = sem_wait(s);
    } while (result == -1 && errno == EINTR);
    return result;
}

static inline int  semaphore_post(sem_t *const s)
{
    return sem_post(s);
}

In cases where signal delivery should not interrupt waiting on the semaphore, you use semaphore_wait(). If you do want a signal delivery to interrupt waiting on a semaphore, you use sem_wait(); if it returns -1 with errno == EINTR, the operation was interrupted due to signal delivery, and the semaphore wasn't actually decremented. (Many other low-level functions, like read(), write(), send(), recv(), can be interrupted in the exact same way; they can also just return a short count, in case the interruption occurred part way.)

The semaphore_post() is just a wrapper, so that you can use "matching` post and wait operations. Doing that sort of "useless" wrappers does help understand the code, you see.

The item[] array is used as a circular queue. The num_items indicates the number of items in it. If num_items > 0, the next item to be consumed is item[next_item]. If num_items < MAX_ITEMS, the next item to be produced is item[(next_item + num_items) % MAX_ITEMS].

The % is the modulo operator. Here, because next_item and num_items are always positive, (next_item + num_items) % MAX_ITEMS is always between 0 and MAX_ITEMS - 1, inclusive. This is what makes the buffer circular.

When a producer has constructed a new item, say item_type newitem;, and wants to add it to the shared memory, it basically does the following:

    /* Omitted: Initialize and fill in 'newitem' members */

    /* Wait until there is room in the buffer */
    semaphore_wait(&(mem->room));

    /* Get exclusive access to the structure members */
    semaphore_wait(&(mem->lock));

    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
    mem->num_items++;
    sem_post(&(mem->more));

    semaphore_post(&(mem->lock));

The above is often called enqueue, because it appends an item to a queue (which happends to be implemented via a circular buffer).

When a consumer wants to consume an item (item_type nextitem;) from the shared buffer, it does the following:

    /* Wait until there are items in the buffer */
    semaphore_wait(&(mem->more));

    /* Get exclusive access to the structure members */
    semaphore_wait(&(mem->lock));

    nextitem = mem->item[mem->next_item];
    mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
    mem->num_items = mem->num_items - 1;

    semaphore_post(&(mem->room));

    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
    mem->num_items++;
    sem_post(&(mem->more));

    semaphore_post(&(mem->lock));

    /* Omitted: Do work on 'nextitem' here. */

This is often called dequeue, because it obtains the next item from the queue.

I would recommend you first write a single-process test case, which enqueues MAX_ITEMS, then dequeues them, and verifies the semaphore values are back to initial values. That is not a guarantee of correctness, but it takes care of the most typical bugs.

In practice, I would personally write the queueing functions as static inline helpers in the same header file that describes the shared memory structure. Pretty much

static inline int  shared_get(struct shared *const mem, item_type *const into)
{
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for the next item in the buffer. */
    do {
        err = sem_wait(&(mem->more));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Copy item to caller storage. */
    *into = mem->item[mem->next_item];

    /* Update queue state. */
    mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
    mem->num_items--;

    /* Account for the newly freed slot. */
    sem_post(&(mem->room));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}       

and

static inline int  shared_put(struct shared *const mem, const item_type *const from)
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for room in the buffer. */
    do {
        err = sem_wait(&(mem->room));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Copy item to queue. */
    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;

    /* Update queue state. */
    mem->num_items++;

    /* Account for the newly filled slot. */
    sem_post(&(mem->more));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}       

but note that I wrote these from memory, and not copy-pasted from my test program, because I want you to learn and not to just copy-paste code from others without understanding (and being suspicious of) it.

Why do we need separate counters (first_item, num_items) when we have the semaphores, with corresponding values?

Because we cannot capture the semaphore value at the point where sem_wait() succeeded/continued/stopped blocking.

For example, initially the room semaphore is initialized to MAX_ITEMS, so up to that many producers can run in parallel. Any one of them running sem_getvalue() immediately after sem_wait() will get some later value, not the value or transition that caused sem_wait() to return. (Even with SysV semaphores you cannot obtain the semaphore value that caused wait to return for this process.)

So, instead of indexes or counters to the buffer, we think of the more semaphore as having the value of how many times one can dequeue from the buffer without blocking, and room as having the value of how many times one can enqueue to the buffer without blocking. The lock semaphore grants exclusive access, so that we can modify the shared memory structures (well, next_item and num_items) atomically, without different processes trying to change the values at the same time.

I am not 100% certain that this is the best or optimum pattern, this is one of the most commonly used ones. It is not as robust as I'd like: for each increment (of one) in num_items, one must post on more exactly once; and for each decrement (of one) in num_items, one must increment next_item by exactly one and post on room exactly once, or the scheme falls apart.

There is one final wrinkle, though:

How do producers indicate they are done? How would the scheduler tell producers and/or consumers to stop?

My preferred solution is to add a flag into the shared memory structure, say unsigned int status;, with specific bit masks telling the producers and consumers what to do, that is examined immediately after waiting on the lock:

#define  STOP_PRODUCING  (1 << 0)
#define  STOP_CONSUMING  (1 << 1)

static inline int  shared_get(struct shared *const mem, item_type *const into)
{
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for the next item in the buffer. */
    do {
        err = sem_wait(&(mem->more));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Need to stop consuming? */
    if (mem->state & STOP_CONSUMING) {
        /* Ensure all consumers see the state immediately */
        sem_post(&(mem->more));
        sem_post(&(mem->lock));
        /* ENOMSG == please stop. */
        return errno = ENOMSG;
    }

    /* Copy item to caller storage. */
    *into = mem->item[mem->next_item];

    /* Update queue state. */
    mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
    mem->num_items--;

    /* Account for the newly freed slot. */
    sem_post(&(mem->room));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}       

static inline int  shared_put(struct shared *const mem, const item_type *const from)
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for room in the buffer. */
    do {
        err = sem_wait(&(mem->room));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Time to stop? */
    if (mem->state & STOP_PRODUCING) {
        /* Ensure all producers see the state immediately */
        sem_post(&(mem->lock));
        sem_post(&(mem->room));
        /* ENOMSG == please stop. */
        return errno = ENOMSG;
    }

    /* Copy item to queue. */
    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;

    /* Update queue state. */
    mem->num_items++;

    /* Account for the newly filled slot. */
    sem_post(&(mem->more));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}

which return ENOMSG to the caller if the caller should stop. When the state is changed, one should of course be holding the lock. When adding STOP_PRODUCING, one should also post on the room semaphore (once) to start a "cascade" so all producers stop; and when adding STOP_CONSUMING, post on the more semaphore (once) to start the consumer stop cascade. (Each of them will post on it again, to ensure each producer/consumer sees the state as soon as possible.)

There are other schemes, though; for example signals (setting a volatile sig_atomic_t flag), but it is generally hard to ensure there are no race windows: a process checking the flag just before it is changed, and then blocking on a semaphore.

In this scheme, it would be good to verify that both MAX_ITEMS + NUM_PRODUCERS <= SEM_VALUE_MAX and MAX_ITEMS + NUM_CONSUMERS <= SEM_VALUE_MAX, so that even during the stop cascades, the semaphore value will not overflow.