0
votes

This code sampler is used to learn MPI programming. The MPI package I use is MPICH2 1.3.1. The code below is my first step to learn MPI_Isend(), MPI_Irecv() and MPI_Wait(). The code has a master and several workers. Master receives data from workers while workers send data to master. As usual, the data size is very large, workers split data into trunks and send trunks sequentially. I use some tricks to overlap the computation and communication when sending trunks. The method is very simple, just keeping two buffers to hold two trunks for each sending cycle.

int test_mpi_wait_2(int argc, char* argv[])
{
    int rank;
    int numprocs; 

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    int trunk_num = 6;// assume there are six trunks
    int trunk_size = 10000;// assume each trunk has 10,000 data points
    if(rank == 0)
    {
        //allocate receiving buffer for all workers
        int** recv_buf = new int* [numprocs];
        for(int i=0;i<numprocs;i++)
            recv_buf[i] = new int [trunk_size];

        //collecting first trunk from all workers
        MPI_Request* requests = new MPI_Request[numprocs];
        for(int i=1;i<numprocs;i++)
            MPI_Irecv(recv_buf[i], trunk_size, MPI_INT, i, 0, MPI_COMM_WORLD, &requests[i]);

        //define send_buf counter used to record how many trunks have been collected
        vector<int> counter(numprocs);

        MPI_Status status;
        //assume therer are N-1 workers, then the total trunks will be collected is (N-1)*trunk_num
        for(int i=0;i<(numprocs-1)*trunk_num;i++)
        {         
            //wait until receive one trunk from any worker
            int active_index;
            MPI_Waitany(numprocs-1, requests+1, &active_index, &status);      

            int request_index = active_index + 1;
            int procs_index = active_index + 1;

            //check wheather all trunks from this worker have been collected
            if(++counter[procs_index] != trunk_num)
            {    
                //receive next trunk from this worker
                MPI_Irecv(recv_buf[procs_index], trunk_size, MPI_INT, procs_index, 0, MPI_COMM_WORLD, &requests[request_index]);
            }   
        }

        for(int i=0;i<numprocs;i++)
            delete [] recv_buf[i];
        delete [] recv_buf;
        delete [] requests;

        cout<<rank<<" done"<<endl;
    }
    else
    {  
        //for each worker, the worker first fill one trunk and send it to master
        //for efficiency, the computation of trunk and communication to master is overlapped.
        //two buffers are allocated to implement the overlapped computation

        int* send_buf[2];  
        send_buf[0] = new int [trunk_size];//Buffer A
        send_buf[1] = new int [trunk_size];//Buffer B

        MPI_Request requests[2];

        //file first trunk
        for(int i=0;i<trunk_size;i++)
            send_buf[0][i] = 0;
        //send this trunk
        MPI_Isend(send_buf[0], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[0]);

        if(trunk_num > 1)
        {
            //file second trunk
            for(int i=0;i<trunk_size;i++)
            send_buf[1][i] = i;
            //send this trunk
            MPI_Isend(send_buf[1], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[1]);
        }

        //for remained trunks, keep cycle until all trunks are sent
        for(int i=2;i<trunk_num;i+=2)
        {      
            //wait till trunk data at buffer A is sent
            MPI_Wait(&requests[0], MPI_STATUS_IGNORE);

            //fill buffer A with next trunk data
            for(int j=0;j<trunk_size;j++)
                send_buf[0][j] = j * i;

            //send buffer A
            MPI_Isend(send_buf[0], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[0]);

            //if more trunks are remained, fill buffer B and sent it
            if(i+ 1 < trunk_num)
            {
                MPI_Wait(&requests[1], MPI_STATUS_IGNORE);
                for(int j=0;j<trunk_size;j++)
                    send_buf[1][j] = j * (i + 1);
                MPI_Isend(send_buf[1], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[1]);
            }
        }

        //wait until last two trunks have been sent
        if(trunk_num == 1)
        {
            MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
        }
        else
        {   
            MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
            MPI_Wait(&requests[1], MPI_STATUS_IGNORE);       
        }

        delete [] send_buf[0];
        delete [] send_buf[1];

        cout<<rank<<" done"<<endl;
    }

    MPI_Finalize();

    return 0;
}
2
Formatting the code (use the 0101 button in the edit box) and adding some contex, like what APIs are in use, what the environment is, and what you think the problem might be will help you get better answers more quickly. - Kevin Vermeer
That looks better - so what is the error? - Throwback1986
The errors are PMPI_Wait error and fatal error in MPI_Finalize - Xiao

2 Answers

0
votes

Not much of an answer but this compiles and runs on my version of MPI, with up to 4 processors. The code does seem a bit involved, but I also cannot see any reason why it should not work.

0
votes

I see several obvious ones: some for loops are not terminated, some cout statements aren't terminated, etc. I believe the code wasn't formatted properly...