I decided to post this after hours of trying out solutions to similar problems with no success. I'm writing a C++ MPI+OpenMP code where one MPI node (server) sends double arrays to other nodes. The server spawns threads in order to send to many clients simultaneously. The serial version (with MPI alone) works very well, and so does the single-threaded version. The multi-threaded version (openmp) keeps throwing a segmentation fault error after a random number of iterations. The line printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N)
prints out the values at each iteration. The unpredictability is the number of iterations (in one incident, the code ran successfully only to throw a seg fault error when I tried running it again immediately after). It however always completes with num_threads=1. getData returns a vector of structs, with the struct defined as (int,int,double *).
Here's the code
double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto;
int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();
int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz;
opt_k.proc_files=0; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);
omp_set_num_threads(5);// for now
// parallel region
#pragma omp parallel default(none) shared(d,idx,SY,Sto) private(a_tasks)
{
double *myHeader=new double[SZ_HEADER];
std::fill(myHeader,myHeader+SZ_HEADER,0);
int tid = omp_get_thread_num(), cur_idx, cur_k; int N;
//#pragma omp atomic
N=d.size();
while (idx<N) {
// Assign tasks and fetch results where available
cur_idx=N;
#pragma omp critical(update__idx)
{
if (idx<N) {
cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
}
}
if (cur_idx<N) {
printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N);
MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
while (cur_k && cur_idx<N) {
myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; myHeader[9]=--cur_k;
MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
delete[] d[cur_idx]->data; ++cur_idx;
}
}else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
printf("%d - 4\n", tid);
}
} //end if(loopmain)
} // end while(loopmain)
} // end parallel section
message("terminate slaves");
for(int i=1;i<node_sz;++i){ // terminate
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
}
return 0;
The other matching function is
void CMpifun::slave2()
{
double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
char out_opt='b'; // irrelevant
myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp;
int file_cnt=0; double tmp_t; //local variables
while (true) { // main while loop
header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
if(this->Stat->MPI_TAG == TAG_TERMINATE) {
break;
}
//receive data
while(true) {
Data=new double[(int)(header[1]*header[2])];
MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2];
//dataQ.push_back(d);
delete[] Data;
file_cnt++;
if ((int)header[9]) {
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
} else break;
}
} // end main while loop
message("terminating");
I've tried all the recommendations addressing similar problems. Here are my environment settings
export OMP_WAIT_POLICY="active"
export OMP_NUM_THREADS=4
export OMP_DYNAMIC=true # "true","false"
export OMP_STACKSIZE=200M #
export KMP_STACKSIZE=$OMP_STACKSIZE
ulimit -s unlimited
Many thanks to all that have chipped in. I'm becoming increasingly convinced that this has to do with memory allocation somehow, but also don't understand why. I now have the following code:
double CMpifun::sendData2()
{
double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto; int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();
int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz;
opt_k.proc_files=0; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);
omp_set_num_threads(224);// for now
// parallel region
#pragma omp parallel default(none) shared(idx,SY,Sto,d) private(a_tasks)
{
double *myHeader=new double[SZ_HEADER];
std::fill(myHeader,myHeader+SZ_HEADER,0);
int tid = omp_get_thread_num(), cur_idx, cur_k; int N;
//#pragma omp critical(update__idx)
{
N=d.size();
}
while (idx<N) {
// Assign tasks and fetch results where available
cur_idx=N;
#pragma omp critical(update__idx)
{
if (idx<N) {
cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
}
}
if (cur_idx<N) {
//printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N);
printf("%d: cur_idx:%d, N:%d \n", tid, cur_idx,N);
//#pragma omp critical(update__idx)
{
MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
}
if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
while (cur_k && cur_idx<N) {
//#pragma omp critical(update__idx)
{
myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx;
myHeader[9]=--cur_k;
MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
delete[] d[cur_idx]->data;
}
++cur_idx;
}
}else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
printf("%d - 4\n", tid);
}
} //end if(loopmain)
} // end while(loopmain)
} // end parallel section
message("terminate slaves");
for(int i=1;i<node_sz;++i){ // terminate
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
}
return 0;
And it's pair
void CMpifun::slave2()
{
double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
char out_opt='b'; // irrelevant
myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp;
int file_cnt=0; double tmp_t; //local variables
while (true) { // main while loop
header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
if(this->Stat->MPI_TAG == TAG_TERMINATE) {
break;
}
//receive data
while(true) {
Data=new double[(int)(header[1]*header[2])];
MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
myDataType *d=new myDataType; d->data=Data; d->nRows=(int)header[1]; d->nCols=(int)header[2];
dataQ.push_back(*d);
delete[] Data;
file_cnt++;
if ((int)header[9]) {
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
} else break;
}
// Error section: Uncommenting next line causes seg fault
/*while (dataQ.size()) { // process data
out_hist = new hist_type();
myDataType d = dataQ.back(); dataQ.pop_back(); // critical section
ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist);
resQ.push_back(*out_hist); out_hist=0;
delete[] d.data; delete[] out_im->data;
}*/
//time_arr[1] /= file_cnt; time_arr[2] /= file_cnt;
//header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2];
//header[4]=myRank; header[9]=resQ.size();
} // end main while loop
The update is that if I uncomment the while loop in the Slave2() function then the run doesn't complete. What I don't understand is, this function (slave2) has no openmp/threading whatsoever, but it seems to have an effect. Furthermore it doesn't share any variables with the threaded function. If I comment out the troublesome section then the code runs, irrespective of the number of threads I set (4, 18, 300). My OpenMP environment variables remain as before. The output of limit -a
is as follows,
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 30473
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 37355
cpu time (seconds, -t) unlimited
max user processes (-u) 30473
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
My constructor also calls mpi_init_thread. To address @Tim issue, the reason I used dynamic memory (with new
) is so as not to bloat the stack memory, in following a recommendation from a solution to a similar problem. Your assistance is appreciated.
MPI_Init(argc,argv)
for MPI initialization if that's what you mean. Are there any indications of thread safety violations in my code that you can identify? I posted the code for scrutiny by others because I've hit the ceiling of my debugging ability – darelMPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);
and make sure that the value returned inprovided
is equal toMPI_THREAD_MULTIPLE
. If not, the MPI library does not support having MPI calls made from multiple threads simultaneously. – Hristo Iliev