0
votes

I decided to post this after hours of trying out solutions to similar problems with no success. I'm writing a C++ MPI+OpenMP code where one MPI node (server) sends double arrays to other nodes. The server spawns threads in order to send to many clients simultaneously. The serial version (with MPI alone) works very well, and so does the single-threaded version. The multi-threaded version (openmp) keeps throwing a segmentation fault error after a random number of iterations. The line printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N) prints out the values at each iteration. The unpredictability is the number of iterations (in one incident, the code ran successfully only to throw a seg fault error when I tried running it again immediately after). It however always completes with num_threads=1. getData returns a vector of structs, with the struct defined as (int,int,double *).

Here's the code

double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto;
int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();

int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz;
opt_k.proc_files=0; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);

omp_set_num_threads(5);// for now
// parallel region

#pragma omp parallel default(none) shared(d,idx,SY,Sto) private(a_tasks)
{
    double *myHeader=new double[SZ_HEADER];
    std::fill(myHeader,myHeader+SZ_HEADER,0);
    int tid = omp_get_thread_num(), cur_idx, cur_k; int N;
    //#pragma omp atomic
        N=d.size();
    while (idx<N) {
        // Assign tasks and fetch results where available
        cur_idx=N;
        #pragma omp critical(update__idx)
        {
            if (idx<N) {
                cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
            }
        }
        if (cur_idx<N) {
            printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N);
            MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
            if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
                while (cur_k && cur_idx<N) {
                    myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; myHeader[9]=--cur_k;
                    MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                    MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                    delete[] d[cur_idx]->data;  ++cur_idx;
                }
            }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
                printf("%d - 4\n", tid);
            }

        } //end if(loopmain)
    } // end while(loopmain)

} // end parallel section

message("terminate slaves");
for(int i=1;i<node_sz;++i){ // terminate
  MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
  MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
}
return 0;

The other matching function is

void CMpifun::slave2()
{
    double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
    char out_opt='b'; // irrelevant
    myDataType *out_im = new myDataType;    hist_type *out_hist;    CLdp ldp;
    int file_cnt=0; double tmp_t; //local variables

    while (true) { // main while loop
        header[4]=myRank;   MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
        MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
        if(this->Stat->MPI_TAG == TAG_TERMINATE) {
            break;
        }
        //receive data
        while(true) {
            Data=new double[(int)(header[1]*header[2])];
            MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
            myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2];
            //dataQ.push_back(d);
            delete[] Data;
            file_cnt++;
            if ((int)header[9]) {
                MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
            } else break;
        }
    } // end main while loop
    message("terminating");

I've tried all the recommendations addressing similar problems. Here are my environment settings

export OMP_WAIT_POLICY="active" 
export OMP_NUM_THREADS=4  
export OMP_DYNAMIC=true # "true","false"  
export OMP_STACKSIZE=200M # 
export KMP_STACKSIZE=$OMP_STACKSIZE  
ulimit -s unlimited

Many thanks to all that have chipped in. I'm becoming increasingly convinced that this has to do with memory allocation somehow, but also don't understand why. I now have the following code:

double CMpifun::sendData2()
{
double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto; int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();

int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz;
opt_k.proc_files=0; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);

omp_set_num_threads(224);// for now
// parallel region

#pragma omp parallel default(none) shared(idx,SY,Sto,d) private(a_tasks)
{
    double *myHeader=new double[SZ_HEADER];
    std::fill(myHeader,myHeader+SZ_HEADER,0);
    int tid = omp_get_thread_num(), cur_idx, cur_k; int N;

    //#pragma omp critical(update__idx)
    {
        N=d.size();
    }
    while (idx<N) {
        // Assign tasks and fetch results where available
        cur_idx=N;
        #pragma omp critical(update__idx)
        {
            if (idx<N) {
                cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
            }
        }
        if (cur_idx<N) {
            //printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N);
            printf("%d: cur_idx:%d, N:%d \n", tid, cur_idx,N);
            //#pragma omp critical(update__idx)
            {
                MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
            }
            if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
                while (cur_k && cur_idx<N) {
                    //#pragma omp critical(update__idx)
                    {
                        myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols;   myHeader[3]=cur_idx;
                        myHeader[9]=--cur_k;
                        MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                        MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                        delete[] d[cur_idx]->data;
                    }
                    ++cur_idx;
                }
            }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
                printf("%d - 4\n", tid);
            }

        } //end if(loopmain)
    } // end while(loopmain)

} // end parallel section

message("terminate slaves");
for(int i=1;i<node_sz;++i){ // terminate
  MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
  MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
}
return 0;

And it's pair

void CMpifun::slave2()
{
double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
char out_opt='b'; // irrelevant
myDataType *out_im = new myDataType;    hist_type *out_hist;    CLdp ldp;
int file_cnt=0; double tmp_t; //local variables

while (true) { // main while loop
header[4]=myRank;   MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
if(this->Stat->MPI_TAG == TAG_TERMINATE) {
    break;
}
//receive data
while(true) {
    Data=new double[(int)(header[1]*header[2])];
    MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
    myDataType *d=new myDataType; d->data=Data; d->nRows=(int)header[1]; d->nCols=(int)header[2];
    dataQ.push_back(*d);
    delete[] Data;
    file_cnt++;
    if ((int)header[9]) {
        MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
    } else break;
}

// Error section: Uncommenting next line causes seg fault
/*while (dataQ.size()) { // process data
    out_hist = new hist_type();
    myDataType d = dataQ.back(); dataQ.pop_back(); // critical section
    ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist);
    resQ.push_back(*out_hist); out_hist=0;
    delete[] d.data; delete[] out_im->data;
}*/

//time_arr[1] /= file_cnt; time_arr[2] /= file_cnt;
//header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2];
//header[4]=myRank; header[9]=resQ.size();

} // end main while loop

The update is that if I uncomment the while loop in the Slave2() function then the run doesn't complete. What I don't understand is, this function (slave2) has no openmp/threading whatsoever, but it seems to have an effect. Furthermore it doesn't share any variables with the threaded function. If I comment out the troublesome section then the code runs, irrespective of the number of threads I set (4, 18, 300). My OpenMP environment variables remain as before. The output of limit -a is as follows,

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 30473
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 37355
cpu time               (seconds, -t) unlimited
max user processes              (-u) 30473
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

My constructor also calls mpi_init_thread. To address @Tim issue, the reason I used dynamic memory (with new) is so as not to bloat the stack memory, in following a recommendation from a solution to a similar problem. Your assistance is appreciated.

1
As you call mpi inside a parallel region you must set your mpi mode by mpi_init_threads and follow the thread safe linking instructions of your mpi.tim18
@tim18 Thanks. There already is a call to MPI_Init(argc,argv) for MPI initialization if that's what you mean. Are there any indications of thread safety violations in my code that you can identify? I posted the code for scrutiny by others because I've hit the ceiling of my debugging abilitydarel
Start by making your MPI environment thread-safe. Initialise the MPI library using MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); and make sure that the value returned in provided is equal to MPI_THREAD_MULTIPLE. If not, the MPI library does not support having MPI calls made from multiple threads simultaneously.Hristo Iliev
@Hristo Iliev Thanks. I did so. Provided level of support is 3, and the constant MPI_THREAD_MULTIPLE also evaluates to 3, so I guess the support is in place. Not much success thoughdarel

1 Answers

2
votes

The biggest problem I see are the many race conditions your code exhibits. The erratic behavior you are seeing is no doubt caused by this. Remember that any time you access a shared variable in OpenMP (either declared via the shared keyword or by global scope), you are accessing memory that can be read or written by any other thread in the gang with no guarantees about order. For example,

N = d.size();

is a race condition because std::vector is not thread-safe. Because you are using OpenMP inside of a class, then any member variables are also considered "global" and thus not thread-safe by default.

As @tim18 noted, because you are calling MPI routines from within OpenMP parallel regions, you should initialize the MPI runtime to be thread-safe using the MPI_Init_thread function.


As an aside, your C++ needs some work. You should never use new or delete in user-level code. Use RAII to manage object lifetimes and wrap large data structures in thin objects that manage the lifetime for you. For example, this line

delete[] d[cur_idx]->data;

tells me that there are demons lurking in your code, waiting to be unleashed upon the unsuspecting user (which could be you!). Incidentally, this is also a race condition. Many demons!