Very recently I have started working on pthreads and trying to implement software pipelining with pthreads. To do that I have a written a toy program myself, a similar of which would be a part of my main project.
So in this program the main thread creates and input and output buffer of integer type and then creates a single master thread and passes those buffers to the master thread. The master thread in turn creates two worker threads.
The input and the output buffer that is passed from the main to the master thread is of size nxk (e.g. 5x10 of size int). The master thread iterates over a chunk of size k (i.e. 10) for n (i.e. 5) number of times. There is a loop running in the master thread for k (5 in here) number of times. In each iteration of k the master thread does some operation on a portion of input data of size n and place it in the common buffer shared between the master and the worker threads. The master thread then signals the worker threads that the data has been placed in the common buffer.
The two worker threads waits for the signal from the master thread if the common buffer is ready. The operation on the common buffer is divided into half among the worker threads. Which means one worker thread would work on the first half and the other worker thread would work on the next half of the common buffer. Once the worker threads gets the signal from the master thread, each of the worker thread does some operation on their half of the data and copy it to the output buffer. Then the worker threads informs the master thread that their operation is complete on the common buffer by setting flag values. An array of flags are created for worker threads. The master thread keeps on checking if all the flags are set which basically means all the worker threads finished their operation on the common buffer and so master thread can place the next data chunk into the common buffer safely for worker thread's consumption.
So essentially there is communication between the master and the worker threads in a pipelined fashion. In the very end I am printing the output buffer in the main thread. But I am getting no output at all. I have copy pasted my code with full comments on almost all steps.
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/time.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdbool.h>
#include <string.h>
#define MthNum 1 //Number of Master threads
#define WthNum 2 //Number of Worker threads
#define times 5 // Number of times the iteration (n in the explanation)
#define elNum 10 //Chunk size during each iteration (k in the explanation)
pthread_mutex_t mutex; // mutex variable declaration
pthread_cond_t cond_var; //conditional variarble declaration
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends
//marking the completion
int *commonBuff; //common buffer between master and worker threads
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1
// the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1)
int *commFlags_s;
int counter; // This counter used my master thread to count if all the commFlags[i] that shows
//all the threads finished their work on the common buffer
// static pthread_barrier_t barrier;
// Arguments structure passed to master thread
typedef struct{
int *input; // input buffer
int *output;// output buffer
}master_args;
// Arguments structure passed to worker thread
typedef struct{
int threadId;
int *outBuff;
}worker_args;
void* worker_func(void *arguments);
void *master_func(void *);
int main(int argc,char*argv[]){
int *ipData,*opData;
int i,j;
// allocation of input buffer and initializing to 0
ipData = (int *)malloc(times*elNum*sizeof(int));
memset(ipData,0,times*elNum*sizeof(int));
// allocation of output buffer and initializing to 0
opData = (int *)malloc(times*elNum*sizeof(int));
memset(opData,0,times*elNum*sizeof(int));
pthread_t thread[MthNum];
master_args* args[MthNum];
//creating the single master thread and passing the arguments
for( i=0;i<MthNum;i++){
args[i] = (master_args *)malloc(sizeof(master_args));
args[i]->input= ipData;
args[i]->output= opData;
pthread_create(&thread[i],NULL,master_func,(void *)args[i]);
}
//joining the master thred
for(i=0;i<MthNum;i++){
pthread_join(thread[i],NULL);
}
//printing the output buffer values
for(j =0;j<times;j++ ){
for(i =0;i<elNum;i++){
printf("%d\t",opData[i+j*times]);
}
printf("\n");
}
return 0;
}
//This is the master thread function
void *master_func(void *arguments){
//copying the arguments pointer to local variables
master_args* localMasterArgs = (master_args *)arguments;
int *indataArgs = localMasterArgs->input; //input buffer
int *outdataArgs = localMasterArgs->output; //output buffer
//worker thread declaration
pthread_t Workers[WthNum];
//worker thread arguments declaration
worker_args* wArguments[WthNum];
int i,j;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init (&cond_var, NULL);
counter =0;
commonBuff = (int *)malloc(elNum*sizeof(int));
commFlags = (int *)malloc(WthNum*sizeof(int));
memset(commFlags,0,WthNum*sizeof(int) );
commFlags_s= (int *)malloc(WthNum*sizeof(int));
memset(commFlags_s,0,WthNum*sizeof(int) );
for(i =0;i<WthNum;i++){
wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
wArguments[i]->threadId = i;
wArguments[i]->outBuff = outdataArgs;
pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]);
}
for (i = 0; i < times; i++) {
for (j = 0; j < elNum; j++)
indataArgs[i + j * elNum] = i + j;
while (counter != 0) {
counter = 0;
pthread_mutex_lock(&mutex);
for (j = 0; j < WthNum; j++) {
counter += commFlags_s[j];
}
pthread_mutex_unlock(&mutex);
}
pthread_mutex_lock(&mutex);
memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int));
pthread_mutex_unlock(&mutex);
counter = 1;
while (counter != 0) {
counter = 0;
pthread_mutex_lock(&mutex);
for (j = 0; j < WthNum; j++) {
counter += commFlags[j];
}
pthread_mutex_unlock(&mutex);
}
// printf("master broad cast\n");
pthread_mutex_lock(&mutex);
pthread_cond_broadcast(&cond_var);
//releasing the lock
pthread_mutex_unlock(&mutex);
}
pthread_mutex_lock(&mutex);
completion_flag = false;
pthread_mutex_unlock(&mutex);
for (i = 0; i < WthNum; i++) {
pthread_join(Workers[i], NULL);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_var);
return NULL;
}
void* worker_func(void *arguments){
worker_args* localArgs = (worker_args*)arguments;
//copying the thread ID and the output buffer
int tid = localArgs->threadId;
int *localopBuffer = localArgs->outBuff;
int i,j;
bool local_completion_flag=false;
while(local_completion_flag){
pthread_mutex_lock(&mutex);
commFlags[tid] =0;
commFlags_s[tid] =1;
pthread_cond_wait(&cond_var,&mutex);
commFlags_s[tid] =0;
commFlags[tid] =1;
if (tid == 0) {
for (i = 0; i < (elNum / 2); i++) {
localopBuffer[i] = commonBuff[i] * 5;
}
} else { // Thread ID 1 operating on the other half of the common buffer data and placing on the
// output buffer
for (i = 0; i < (elNum / 2); i++) {
localopBuffer[elNum / 2 + i] = commonBuff[elNum / 2 + i] * 10;
}
}
local_completion_flag=completion_flag;
pthread_mutex_unlock(&mutex);//releasing the lock
}
return NULL;
}
But I have no idea where I have done wrong in my implementation since logically it seems to be correct. But definitely there is something wrong in my implementation. I have spent a long time trying different things to fix it but nothing worked. Sorry for this long post but I am unable to determine a section where I might have done wrong and so I couldn't concise the post. So if anybody could take a look into the problem and implementation and can suggest what changes needed to be done to run it as intended then that it would be really helpful. Thank you for your help and assistance.
countersupposed to be protected by the mutex or not? There seem to be lots of cases where you modifycounterwithout holding the mutex or setcounterto zero without broadcasting the c.v. -- those kinds of things can lead to waiting for something that has already happened. - David Schwartz