1
votes

Very recently I have started working on pthreads and trying to implement software pipelining with pthreads. To do that I have a written a toy program myself, a similar of which would be a part of my main project.

So in this program the main thread creates and input and output buffer of integer type and then creates a single master thread and passes those buffers to the master thread. The master thread in turn creates two worker threads.

The input and the output buffer that is passed from the main to the master thread is of size nxk (e.g. 5x10 of size int). The master thread iterates over a chunk of size k (i.e. 10) for n (i.e. 5) number of times. There is a loop running in the master thread for k (5 in here) number of times. In each iteration of k the master thread does some operation on a portion of input data of size n and place it in the common buffer shared between the master and the worker threads. The master thread then signals the worker threads that the data has been placed in the common buffer.

The two worker threads waits for the signal from the master thread if the common buffer is ready. The operation on the common buffer is divided into half among the worker threads. Which means one worker thread would work on the first half and the other worker thread would work on the next half of the common buffer. Once the worker threads gets the signal from the master thread, each of the worker thread does some operation on their half of the data and copy it to the output buffer. Then the worker threads informs the master thread that their operation is complete on the common buffer by setting flag values. An array of flags are created for worker threads. The master thread keeps on checking if all the flags are set which basically means all the worker threads finished their operation on the common buffer and so master thread can place the next data chunk into the common buffer safely for worker thread's consumption.

So essentially there is communication between the master and the worker threads in a pipelined fashion. In the very end I am printing the output buffer in the main thread. But I am getting no output at all. I have copy pasted my code with full comments on almost all steps.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/time.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdbool.h>
#include <string.h>

#define MthNum 1 //Number of Master threads
#define WthNum 2 //Number of Worker threads
#define times 5 // Number of times the iteration (n in the explanation)
#define elNum 10 //Chunk size during each iteration (k in the explanation)

pthread_mutex_t mutex; // mutex variable declaration
pthread_cond_t cond_var; //conditional variarble declaration
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends
                             //marking the completion
int *commonBuff; //common buffer between master and worker threads
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1
                // the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1)
int *commFlags_s;
int counter; // This counter used my master thread to count if all the commFlags[i] that shows
             //all the threads finished their work on the common buffer
// static pthread_barrier_t barrier;
// Arguments structure passed to master thread
typedef struct{
    int *input; // input buffer
    int *output;// output buffer
}master_args;

// Arguments structure passed to worker thread
typedef struct{
    int threadId;
    int *outBuff;
}worker_args;

void* worker_func(void *arguments);
void *master_func(void *);

int main(int argc,char*argv[]){

    int *ipData,*opData;
    int i,j;

    // allocation of input buffer and initializing to 0
    ipData = (int *)malloc(times*elNum*sizeof(int));
    memset(ipData,0,times*elNum*sizeof(int));

    // allocation of output buffer and initializing to 0
    opData = (int *)malloc(times*elNum*sizeof(int));
    memset(opData,0,times*elNum*sizeof(int));

    pthread_t thread[MthNum];
    master_args* args[MthNum];


    //creating the single master thread and passing the arguments
    for( i=0;i<MthNum;i++){
        args[i] = (master_args *)malloc(sizeof(master_args));
        args[i]->input= ipData;
        args[i]->output= opData;
        pthread_create(&thread[i],NULL,master_func,(void *)args[i]);
    }

    //joining the master thred
    for(i=0;i<MthNum;i++){
        pthread_join(thread[i],NULL);
    }

    //printing the output buffer values
    for(j =0;j<times;j++ ){
        for(i =0;i<elNum;i++){
            printf("%d\t",opData[i+j*times]);
        }
      printf("\n");
    }

    return 0;
}

//This is the master thread function
void *master_func(void *arguments){

    //copying the arguments pointer to local variables
    master_args* localMasterArgs = (master_args *)arguments;
    int *indataArgs = localMasterArgs->input; //input buffer
    int *outdataArgs = localMasterArgs->output; //output buffer

    //worker thread declaration
    pthread_t Workers[WthNum];
    //worker thread arguments declaration
    worker_args* wArguments[WthNum];
    int i,j;

    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init (&cond_var, NULL);
    counter =0;

    commonBuff = (int *)malloc(elNum*sizeof(int));

    commFlags = (int *)malloc(WthNum*sizeof(int));
    memset(commFlags,0,WthNum*sizeof(int) );
    commFlags_s= (int *)malloc(WthNum*sizeof(int));
    memset(commFlags_s,0,WthNum*sizeof(int) );

    for(i =0;i<WthNum;i++){

        wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
        wArguments[i]->threadId = i;
        wArguments[i]->outBuff = outdataArgs;

        pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]);
    }

    for (i = 0; i < times; i++) {
        for (j = 0; j < elNum; j++)
            indataArgs[i + j * elNum] = i + j;

        while (counter != 0) {
            counter = 0;

            pthread_mutex_lock(&mutex);
            for (j = 0; j < WthNum; j++) {
                counter += commFlags_s[j];
            }
            pthread_mutex_unlock(&mutex);

        }
        pthread_mutex_lock(&mutex);
        memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int));
        pthread_mutex_unlock(&mutex);
        counter = 1;
        while (counter != 0) {
            counter = 0;

            pthread_mutex_lock(&mutex);
            for (j = 0; j < WthNum; j++) {
                counter += commFlags[j];
            }
            pthread_mutex_unlock(&mutex);


        }
        // printf("master broad cast\n");
        pthread_mutex_lock(&mutex);
        pthread_cond_broadcast(&cond_var);
         //releasing the lock
        pthread_mutex_unlock(&mutex);

    }

    pthread_mutex_lock(&mutex);
     completion_flag = false;
    pthread_mutex_unlock(&mutex);

    for (i = 0; i < WthNum; i++) {
        pthread_join(Workers[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond_var);

    return NULL;
}


void* worker_func(void *arguments){

    worker_args* localArgs = (worker_args*)arguments;

    //copying the thread ID and the output buffer
    int tid = localArgs->threadId;
    int *localopBuffer = localArgs->outBuff;
    int i,j;
    bool local_completion_flag=false;

    while(local_completion_flag){

        pthread_mutex_lock(&mutex);
        commFlags[tid] =0;
        commFlags_s[tid] =1;
        pthread_cond_wait(&cond_var,&mutex);
        commFlags_s[tid] =0;
        commFlags[tid] =1;
        if (tid == 0) {
            for (i = 0; i < (elNum / 2); i++) {
                localopBuffer[i] = commonBuff[i] * 5;
            }
        } else { // Thread ID 1 operating on the other half of the common buffer data and placing on the
                 // output buffer
            for (i = 0; i < (elNum / 2); i++) {
                localopBuffer[elNum / 2 + i] = commonBuff[elNum / 2 + i] * 10;
            }
        }
         local_completion_flag=completion_flag;
        pthread_mutex_unlock(&mutex);//releasing the lock

    }

    return NULL;
}

But I have no idea where I have done wrong in my implementation since logically it seems to be correct. But definitely there is something wrong in my implementation. I have spent a long time trying different things to fix it but nothing worked. Sorry for this long post but I am unable to determine a section where I might have done wrong and so I couldn't concise the post. So if anybody could take a look into the problem and implementation and can suggest what changes needed to be done to run it as intended then that it would be really helpful. Thank you for your help and assistance.

2
Is counter supposed to be protected by the mutex or not? There seem to be lots of cases where you modify counter without holding the mutex or set counter to zero without broadcasting the c.v. -- those kinds of things can lead to waiting for something that has already happened. - David Schwartz
Also i suggest to wait 6's reputation for quanta time for a thread sleeping insteade of 30 use for example 18 - Hamit YILDIRIM
@DavidSchwartz sorry for my late reply. The counter is to count the number of the flags that would be turned on by the worker threads. The master threads measure the flags that are turned to 1 by the worker threads and increase the counter value. So if the counter value is equal to the number of threads then the while loop exits and proceed forward and So the counter is not modified by the worker threads. the counter is modified by the master thread only. It broadcasts the value to the worker threads as the worker threads waiting on the counter value to be 0. I am not sure if I. need mutex - duttasankha
@HamitYıldırım I am sorry but I didn't actually understood your point. Can you elaborate a bit more. Thanks - duttasankha
think a thread waits on background 6 ms then when if it starts to wait processor looks it after handling another one. For this reason look it s releasing process 6 and its reputation times. This just a detail feedback not all your problems solving. For the full solution in your case i suggest to look producer consumer samples - Hamit YILDIRIM

2 Answers

0
votes

There are several errors in this code.

  1. You may start from fixing creation of worker threads:

    wArguments[i] = (worker_args* )malloc(sizeof(worker_args));
    wArguments[i]->threadId = i;
    wArguments[i]->outBuff = outdataArgs;
    
    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments);
    

You are initializing worker_args structs but incorrectly - passing pointer to array (void *)wArguments instead of pointers to array elements you just initialized.

pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]);
//                                                             ^^^
  1. Initialize counter before starting threads that use it's value:

    void *master_func(void *arguments)
    {
    /* (...) */
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init (&cond_var, NULL);
    counter = WthNum;
    
  2. When starting master thread, you incorrectly pass pointer to pointer:

    pthread_create(&thread[i],NULL,master_func,(void *)&args[i]);
    

Please change this to:

pthread_create(&thread[i],NULL,master_func,(void *) args[i]);
  1. All accesses to counter variable (as any other shared memory) must be synchronized between threads.
0
votes

I think you should use semaphore based producer- consumer model like this

https://jlmedina123.wordpress.com/2014/04/08/255/