I decided to post this after hours of trying out solutions to similar problems with no success. I'm writing a C++ MPI+OpenMP code where one MPI node (server) sends double arrays to other nodes. The server spawns threads in order to send to many clients simultaneously. The serial version (with MPI alone) works very well, and so does the single-threaded version. The multi-threaded version (openmp) keeps throwing a segmentation fault error after a random number of iterations. The line printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N) prints out the values at each iteration. The unpredictability is the number of iterations (in one incident, the code ran successfully only to throw a seg fault error when I tried running it again immediately after). It however always completes with num_threads=1. getData returns a vector of structs, with the struct defined as (int,int,double *).
Here's the code
double *tStatistics=new double[8], tmp_time; // wall clock time double SY, Sto; int a_tasks=0, file_p=0; vector<myDataType *> d = getData(); int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz; opt_k.proc_files=0; SY=0; Sto=0; std::fill(header,header+SZ_HEADER,-1); omp_set_num_threads(5);// for now // parallel region #pragma omp parallel default(none) shared(d,idx,SY,Sto) private(a_tasks) { double *myHeader=new double[SZ_HEADER]; std::fill(myHeader,myHeader+SZ_HEADER,0); int tid = omp_get_thread_num(), cur_idx, cur_k; int N; //#pragma omp atomic N=d.size(); while (idx<N) { // Assign tasks and fetch results where available cur_idx=N; #pragma omp critical(update__idx) { if (idx<N) { cur_idx=idx; cur_k=opt_k.k; idx+=cur_k; } } if (cur_idx<N) { printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N); MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks while (cur_k && cur_idx<N) { myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; myHeader[9]=--cur_k; MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); delete[] d[cur_idx]->data; ++cur_idx; } }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results printf("%d - 4\n", tid); } } //end if(loopmain) } // end while(loopmain) } // end parallel section message("terminate slaves"); for(int i=1;i<node_sz;++i){ // terminate MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP); } return 0; The other matching function is
void CMpifun::slave2() { double *Data; vector<myDataType> dataQ; vector<hist_type> resQ; char out_opt='b'; // irrelevant myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp; int file_cnt=0; double tmp_t; //local variables while (true) { // main while loop header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP); MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); if(this->Stat->MPI_TAG == TAG_TERMINATE) { break; } //receive data while(true) { Data=new double[(int)(header[1]*header[2])]; MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2]; //dataQ.push_back(d); delete[] Data; file_cnt++; if ((int)header[9]) { MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); } else break; } } // end main while loop message("terminating"); I've tried all the recommendations addressing similar problems. Here are my environment settings
export OMP_WAIT_POLICY="active" export OMP_NUM_THREADS=4 export OMP_DYNAMIC=true # "true","false" export OMP_STACKSIZE=200M # export KMP_STACKSIZE=$OMP_STACKSIZE ulimit -s unlimited Many thanks to all that have chipped in. I'm becoming increasingly convinced that this has to do with memory allocation somehow, but also don't understand why. I now have the following code:
double CMpifun::sendData2() { double *tStatistics=new double[8], tmp_time; // wall clock time double SY, Sto; int a_tasks=0, file_p=0; vector<myDataType *> d = getData(); int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz; opt_k.proc_files=0; SY=0; Sto=0; std::fill(header,header+SZ_HEADER,-1); omp_set_num_threads(224);// for now // parallel region #pragma omp parallel default(none) shared(idx,SY,Sto,d) private(a_tasks) { double *myHeader=new double[SZ_HEADER]; std::fill(myHeader,myHeader+SZ_HEADER,0); int tid = omp_get_thread_num(), cur_idx, cur_k; int N; //#pragma omp critical(update__idx) { N=d.size(); } while (idx<N) { // Assign tasks and fetch results where available cur_idx=N; #pragma omp critical(update__idx) { if (idx<N) { cur_idx=idx; cur_k=opt_k.k; idx+=cur_k; } } if (cur_idx<N) { //printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N); printf("%d: cur_idx:%d, N:%d \n", tid, cur_idx,N); //#pragma omp critical(update__idx) { MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); } if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks while (cur_k && cur_idx<N) { //#pragma omp critical(update__idx) { myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; myHeader[9]=--cur_k; MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP); delete[] d[cur_idx]->data; } ++cur_idx; } }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results printf("%d - 4\n", tid); } } //end if(loopmain) } // end while(loopmain) } // end parallel section message("terminate slaves"); for(int i=1;i<node_sz;++i){ // terminate MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP); } return 0; And it's pair
void CMpifun::slave2() { double *Data; vector<myDataType> dataQ; vector<hist_type> resQ; char out_opt='b'; // irrelevant myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp; int file_cnt=0; double tmp_t; //local variables while (true) { // main while loop header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP); MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat); if(this->Stat->MPI_TAG == TAG_TERMINATE) { break; } //receive data while(true) { Data=new double[(int)(header[1]*header[2])]; MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); myDataType *d=new myDataType; d->data=Data; d->nRows=(int)header[1]; d->nCols=(int)header[2]; dataQ.push_back(*d); delete[] Data; file_cnt++; if ((int)header[9]) { MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat); } else break; } // Error section: Uncommenting next line causes seg fault /*while (dataQ.size()) { // process data out_hist = new hist_type(); myDataType d = dataQ.back(); dataQ.pop_back(); // critical section ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist); resQ.push_back(*out_hist); out_hist=0; delete[] d.data; delete[] out_im->data; }*/ //time_arr[1] /= file_cnt; time_arr[2] /= file_cnt; //header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2]; //header[4]=myRank; header[9]=resQ.size(); } // end main while loop The update is that if I uncomment the while loop in the Slave2() function then the run doesn't complete. What I don't understand is, this function (slave2) has no openmp/threading whatsoever, but it seems to have an effect. Furthermore it doesn't share any variables with the threaded function. If I comment out the troublesome section then the code runs, irrespective of the number of threads I set (4, 18, 300). My OpenMP environment variables remain as before. The output of limit -a is as follows,
core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 30473 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 1024 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 37355 cpu time (seconds, -t) unlimited max user processes (-u) 30473 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited My constructor also calls mpi_init_thread. To address @Tim issue, the reason I used dynamic memory (with new) is so as not to bloat the stack memory, in following a recommendation from a solution to a similar problem. Your assistance is appreciated.
MPI_Init(argc,argv)for MPI initialization if that's what you mean. Are there any indications of thread safety violations in my code that you can identify? I posted the code for scrutiny by others because I've hit the ceiling of my debugging abilityMPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);and make sure that the value returned inprovidedis equal toMPI_THREAD_MULTIPLE. If not, the MPI library does not support having MPI calls made from multiple threads simultaneously.