I'm working on a project that solves the classic problem of producer / consumer scheduling. Linux Open Suse 42.3 Leep, API System V, C language
The project consists of three programs: producer, consumer and scheduler. The purpose of schedulers is to create 3 semaphores, shared memory in which there is a buffer (array) in which write (producer) and read (consumer) and to run n producer and m consumer processes.
Each producer must perform k write cycles to the buffer, and the consumer must perform k read cycles.
3 semaphores were used: mutex, empty and full. The value of the full semaphore is used in the program as an index in the array.
The problem is that: for example, when the buffer size is 3, producers write 4 portions of data, when the buffer size is 4 - 5 portions of data (although there should be 4) ...
Consumers read normally.
In addition, the program does not behave predictably when calling get_semVal fucntion.
Please help, I will be very, very grateful for the answer.
producer
#define BUFFER_SIZE 3
#define MY_RAND_MAX 99 // Highest integer for random number generator
#define LOOP 3 //the number of write / read cycles for each process
#define DATA_DIMENSION 4 // size of portion of data for 1 iteration
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;
buffer_item buffer[BUFFER_SIZE];
void P(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = -1;
op.sem_flg = 0;
semop(semid,&op,1);
}
void V(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = +1;
op.sem_flg = 0;
semop(semid,&op,1);
}
void Init(int semid,int index,int value)
{
semctl(semid,index,SETVAL,value);
}
int get_semVal(int sem_id)
{
int value = semctl(sem_id,0,GETVAL,0);
return value;
}
int main()
{
sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);
srand(time(NULL));
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE, 0);
int i=0;
buffer_item *adr;
do {
buffer_item nextProduced;
P(sem_empty);
P(sem_mutex);
//prepare portion of data
for(int j=0;j<DATA_DIMENSION;j++)
{
nextProduced.buf[j]=rand()%5;
}
adr = (buffer_item*)shmat(shm_id,NULL,0);
int full_value = get_semVal(sem_full);//get index of array
printf("-----%d------\n",full_value-1);//it’s for test the index of array in buffer
// write the generated portion of data by index full_value-1
adr[full_value-1].buf[0] = nextProduced.buf[0];
adr[full_value-1].buf[1] = nextProduced.buf[1];
adr[full_value-1].buf[2] = nextProduced.buf[2];
adr[full_value-1].buf[3] = nextProduced.buf[3];
shmdt(adr);
printf("producer %d produced %d %d %d %d\n", getpid(), nextProduced.buf[0],nextProduced.buf[1],nextProduced.buf[2],nextProduced.buf[3]);
V(sem_mutex);
V(sem_full);
i++;
} while (i<LOOP);
V(sem_empty);
sleep(1);
}
consumer
…
int main()
{
sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);
srand(time(NULL));
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE,0);
int i=0;
buffer_item *adr;
do
{
buffer_item nextConsumed;
P(sem_full);
P(sem_mutex);
int full_value = get_semVal(sem_full);
adr = (buffer_item*)shmat(shm_id,NULL,0);
for(int i=0;i<BUFFER_SIZE;i++)
{
printf("--%d %d %d %d\n",adr[i].buf[0],adr[i].buf[1],adr[i].buf[2],adr[i].buf[3]);
}
for(int i=0;i<BUFFER_SIZE;i++)
{
buffer[i].buf[0] = adr[i].buf[0];
buffer[i].buf[1] = adr[i].buf[1];
buffer[i].buf[2] = adr[i].buf[2];
buffer[i].buf[3] = adr[i].buf[3];
}
tab(nextConsumed);
nextConsumed.buf[0]=buffer[full_value-1].buf[0];
nextConsumed.buf[1]=buffer[full_value-1].buf[1];
nextConsumed.buf[2]=buffer[full_value-1].buf[2];
nextConsumed.buf[3]=buffer[full_value-1].buf[3];
// Set buffer to 0 since we consumed that item
for(int j=0;j<DATA_DIMENSION;j++)
{
buffer[full_value-1].buf[j]=0;
}
for(int i=0;i<BUFFER_SIZE;i++)
{
adr[i].buf[0]=buffer[i].buf[0];
adr[i].buf[1]=buffer[i].buf[1];
adr[i].buf[2]=buffer[i].buf[2];
adr[i].buf[3]=buffer[i].buf[3];
}
shmdt(adr);
printf("consumer %d consumed %d %d %d %d\n", getpid() ,nextConsumed.buf[0],nextConsumed.buf[1],nextConsumed.buf[2],nextConsumed.buf[3]);
V(sem_mutex);
// increase empty
V(sem_empty);
i++;
} while (i<LOOP);
V(sem_full);
sleep(1);
}
Scheduler
…
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;
buffer_item buffer[BUFFER_SIZE];
struct TProcList
{
pid_t processPid;
};
typedef struct TProcList ProcList;
…
ProcList createProcess(char *name)
{
pid_t pid;
ProcList a;
pid = fork();
if (!pid){
kill(getpid(),SIGSTOP);
execl(name,name,NULL);
exit(0);
}
else if(pid){
a.processPid=pid;
}
else
cout<<"error forking"<<endl;
return a;
}
int main()
{
sem_mutex = semget(KEY_MUTEX,1,IPC_CREAT|0600);
sem_empty = semget(KEY_EMPTY,1,IPC_CREAT|0600);
sem_full = semget(KEY_FULL,1,IPC_CREAT|0600);
Init(sem_mutex,0,1);//unlock mutex
Init(sem_empty,0,BUFFER_SIZE);
Init(sem_full,0,0);//unlock empty
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE,IPC_CREAT|0600);
buffer_item *adr;
adr = (buffer_item*)shmat(shm_id,NULL,0);
for(int i=0;i<BUFFER_SIZE;i++)
{
buffer[i].buf[0]=0;
buffer[i].buf[1]=0;
buffer[i].buf[2]=0;
buffer[i].buf[3]=0;
}
for(int i=0;i<BUFFER_SIZE;i++)
{
adr[i].buf[0] = buffer[i].buf[0];
adr[i].buf[1] = buffer[i].buf[1];
adr[i].buf[2] = buffer[i].buf[2];
adr[i].buf[3] = buffer[i].buf[3];
}
int consumerNumber = 2;
int produserNumber = 2;
ProcList producer_pids[produserNumber];
ProcList consumer_pids[consumerNumber];
for(int i=0;i<produserNumber;i++)
{
producer_pids[i]=createProcess("/home/andrey/build-c-unknown-Debug/c");//create sleeping processes
}
for(int i=0;i<consumerNumber;i++)
{
consumer_pids[i]=createProcess("/home/andrey/build-p-unknown-Debug/p");
}
sleep(3);
for(int i=0;i<produserNumber;i++)
{
kill(producer_pids[i].processPid,SIGCONT);//continue processes
sleep(1);
}
for(int i=0;i<consumerNumber;i++)
{
kill(consumer_pids[i].processPid,SIGCONT);
sleep(1);
}
for(int i=0;i<produserNumber;i++)
{
waitpid(producer_pids[i].processPid,&stat,WNOHANG);//wait
}
for(int i=0;i<consumerNumber;i++)
{
waitpid(consumer_pids[i].processPid,&stat,WNOHANG);
}
shmdt(adr);
semctl(sem_mutex,0,IPC_RMID);
semctl(sem_full,0,IPC_RMID);
semctl(sem_empty,0,IPC_RMID);
}

cout<<is C++, and not C. They are two different programming languages, and there are separate c and c++ tags for them for that reason. - Nonecout << "error forking" << endl;with the Cprintf("Cannot fork: %s.\n", strerror(errno));that also describes the error.fork()error case is whenpid == -1,0is only returned in the child, and all other values are returned in the parent as the child process ID.) - None