This code sampler is used to learn MPI programming. The MPI package I use is MPICH2 1.3.1. The code below is my first step to learn MPI_Isend(), MPI_Irecv() and MPI_Wait(). The code has a master and several workers. Master receives data from workers while workers send data to master. As usual, the data size is very large, workers split data into trunks and send trunks sequentially. I use some tricks to overlap the computation and communication when sending trunks. The method is very simple, just keeping two buffers to hold two trunks for each sending cycle.
int test_mpi_wait_2(int argc, char* argv[])
{
int rank;
int numprocs;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
int trunk_num = 6;// assume there are six trunks
int trunk_size = 10000;// assume each trunk has 10,000 data points
if(rank == 0)
{
//allocate receiving buffer for all workers
int** recv_buf = new int* [numprocs];
for(int i=0;i<numprocs;i++)
recv_buf[i] = new int [trunk_size];
//collecting first trunk from all workers
MPI_Request* requests = new MPI_Request[numprocs];
for(int i=1;i<numprocs;i++)
MPI_Irecv(recv_buf[i], trunk_size, MPI_INT, i, 0, MPI_COMM_WORLD, &requests[i]);
//define send_buf counter used to record how many trunks have been collected
vector<int> counter(numprocs);
MPI_Status status;
//assume therer are N-1 workers, then the total trunks will be collected is (N-1)*trunk_num
for(int i=0;i<(numprocs-1)*trunk_num;i++)
{
//wait until receive one trunk from any worker
int active_index;
MPI_Waitany(numprocs-1, requests+1, &active_index, &status);
int request_index = active_index + 1;
int procs_index = active_index + 1;
//check wheather all trunks from this worker have been collected
if(++counter[procs_index] != trunk_num)
{
//receive next trunk from this worker
MPI_Irecv(recv_buf[procs_index], trunk_size, MPI_INT, procs_index, 0, MPI_COMM_WORLD, &requests[request_index]);
}
}
for(int i=0;i<numprocs;i++)
delete [] recv_buf[i];
delete [] recv_buf;
delete [] requests;
cout<<rank<<" done"<<endl;
}
else
{
//for each worker, the worker first fill one trunk and send it to master
//for efficiency, the computation of trunk and communication to master is overlapped.
//two buffers are allocated to implement the overlapped computation
int* send_buf[2];
send_buf[0] = new int [trunk_size];//Buffer A
send_buf[1] = new int [trunk_size];//Buffer B
MPI_Request requests[2];
//file first trunk
for(int i=0;i<trunk_size;i++)
send_buf[0][i] = 0;
//send this trunk
MPI_Isend(send_buf[0], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[0]);
if(trunk_num > 1)
{
//file second trunk
for(int i=0;i<trunk_size;i++)
send_buf[1][i] = i;
//send this trunk
MPI_Isend(send_buf[1], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[1]);
}
//for remained trunks, keep cycle until all trunks are sent
for(int i=2;i<trunk_num;i+=2)
{
//wait till trunk data at buffer A is sent
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
//fill buffer A with next trunk data
for(int j=0;j<trunk_size;j++)
send_buf[0][j] = j * i;
//send buffer A
MPI_Isend(send_buf[0], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[0]);
//if more trunks are remained, fill buffer B and sent it
if(i+ 1 < trunk_num)
{
MPI_Wait(&requests[1], MPI_STATUS_IGNORE);
for(int j=0;j<trunk_size;j++)
send_buf[1][j] = j * (i + 1);
MPI_Isend(send_buf[1], trunk_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[1]);
}
}
//wait until last two trunks have been sent
if(trunk_num == 1)
{
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
}
else
{
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
MPI_Wait(&requests[1], MPI_STATUS_IGNORE);
}
delete [] send_buf[0];
delete [] send_buf[1];
cout<<rank<<" done"<<endl;
}
MPI_Finalize();
return 0;
}