1
votes

I am trying to send message to all MPI processes from a process and also receive message from all those processes in a process. It is basically an all to all communication where every process sends message to every other process (except itself) and receives message from every other process.

The following example code snippet shows what I am trying to achieve. Now, the problem with MPI_Send is its behavior where for small message size it acts as non-blocking but for the larger message (in my machine BUFFER_SIZE 16400) it blocks. I am aware of this is how MPI_Send behaves. As a workaround, I replaced the code below with blocking (send+recv) which is MPI_Sendrecv. Example code is like this MPI_Sendrecv(intSendPack, BUFFER_SIZE, MPI_INT, processId, MPI_TAG, intReceivePack, BUFFER_SIZE, MPI_INT, processId, MPI_TAG, MPI_COMM_WORLD, MPI_STATUSES_IGNORE) . I am making the above call for all the processes of MPI_COMM_WORLD inside a loop for every rank and this approach gives me what I am trying to achieve (all to all communication). However, this call takes a lot of time which I want to cut-down with some time-efficient approach. I have tried with mpi scatter and gather to perform all to all communication but here one issue is the buffer size (16400) may differ in actual implementation in different iteration for MPI_all_to_all function calling. Here, I am using MPI_TAG to differentiate the call in different iteration which I cannot use in scatter and gather functions.

#define BUFFER_SIZE 16400

void MPI_all_to_all(int MPI_TAG)
{

    int size;
    int rank;
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    int* intSendPack = new int[BUFFER_SIZE]();
    int* intReceivePack = new int[BUFFER_SIZE]();

    for (int prId = 0; prId < size; prId++) {
        if (prId != rank) {
            MPI_Send(intSendPack, BUFFER_SIZE, MPI_INT, prId, MPI_TAG,
            MPI_COMM_WORLD);
          }
    }

    for (int sId = 0; sId < size; sId++) {
        if (sId != rank) {
            MPI_Recv(intReceivePack, BUFFER_SIZE, MPI_INT, sId, MPI_TAG,
            MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
        }
    }
}

I want to know if there is a way I can perform all to all communication using any efficient communication model. I am not sticking to MPI_Send, if there is some other way which provides me what I am trying to achieve, I am happy with that. Any help or suggestion is much appreciated.

1
I don't understand why you are coding your own all_to_all where there is already MPI_Alltoall?AlexG
@AlexG, may be the subject line of my question created the confusion, but I am not trying to achieve what MPI_Alltoall function does. I am trying to achieve something similar to MPI_Gather. Please take a look at the following post stackoverflow.com/questions/15049190/…Faysal
MPI_Send is blocking. I suspect what you're referring to is buffering, but not sure. You shouldn't use point-to-point communication instead of global communication because it's most likely going to be less efficient (except if you're planning on something special/dedicated but keep in mind it's tricky). Keeping in mind the referenced post, what is it actually that you want to do? Just send around and overwrite arrays? Or something else?atru
@Faysal if all of them update all others and need upates from others at the same point of the program then it would probably be best to use Allgather. If one needs to update all but others don't update then and that one does not need their updates, then it's a Bcast. If you clarify, I'll post an answer with some benchmarking. Also, you're overwriting the common array - is that intended?atru
You can use the same buffer for send (this is basically a MPI_Bcast()) but your snippet keeps overwriting the receive buffer. Can you please update your code to clarify what you are trying to achieve ? It seems MPI_Allgather() is a fit here.Gilles Gouaillardet

1 Answers

1
votes

This is a benchmark that allows to compare performance of collective vs. point-to-point communication in an all-to-all communication,

#include <iostream>
#include <algorithm>
#include <mpi.h>

#define BUFFER_SIZE 16384

void point2point(int*, int*, int, int);

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int rank_id = 0, com_sz = 0;
    double t0 = 0.0, tf = 0.0;
    MPI_Comm_size(MPI_COMM_WORLD, &com_sz);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);

    int* intSendPack = new int[BUFFER_SIZE]();
    int* result = new int[BUFFER_SIZE*com_sz]();
    std::fill(intSendPack, intSendPack + BUFFER_SIZE, rank_id);
    std::fill(result + BUFFER_SIZE*rank_id, result + BUFFER_SIZE*(rank_id+1), rank_id);

    // Send-Receive
    t0 = MPI_Wtime();
    point2point(intSendPack, result, rank_id, com_sz);
    MPI_Barrier(MPI_COMM_WORLD);
    tf = MPI_Wtime();
    if (!rank_id)
        std::cout << "Send-receive time: " << tf - t0 << std::endl;

    // Collective
    std::fill(result, result + BUFFER_SIZE*com_sz, 0);
    std::fill(result + BUFFER_SIZE*rank_id, result + BUFFER_SIZE*(rank_id+1), rank_id);
    t0 = MPI_Wtime();
    MPI_Allgather(intSendPack, BUFFER_SIZE, MPI_INT, result, BUFFER_SIZE, MPI_INT, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    tf = MPI_Wtime();
    if (!rank_id)
        std::cout << "Allgather time: " << tf - t0 << std::endl;

    MPI_Finalize();
    delete[] intSendPack;
    delete[] result;
    return 0;
}

// Send/receive communication
void point2point(int* send_buf, int* result, int rank_id, int com_sz)
{
    MPI_Status status;
    // Exchange and store the data
    for (int i=0; i<com_sz; i++){
        if (i != rank_id){
            MPI_Sendrecv(send_buf, BUFFER_SIZE, MPI_INT, i, 0, 
                result + i*BUFFER_SIZE, BUFFER_SIZE, MPI_INT, i, 0, MPI_COMM_WORLD, &status);
        }
    }
}

Here every rank contributes its own array intSendPack to the array result on all other ranks that should end up the same on all the ranks. result is flat, each rank takes BUFFER_SIZE entries starting with its rank_id*BUFFER_SIZE. After the point-to-point communication, the array is reset to its original shape.

Time is measured by setting up an MPI_Barrier which will give you the maximum time out of all ranks.

I ran the benchmark on 1 node of Nersc Cori KNL using slurm. I ran it a few times each case just to make sure the values are consistent and I'm not looking at an outlier, but you should run it maybe 10 or so times to collect more proper statistics.

Here are some thoughts:

  • For small number of processes (5) and a large buffer size (16384) collective communication is about twice faster than point-to-point, but it becomes about 4-5 times faster when moving to larger number of ranks (64).
  • In this benchmark there is not much difference between performance with recommended slurm settings on that specific machine and default settings but in real, larger programs with more communication there is a very significant one (job that runs for less than a minute with recommended will run for 20-30 min and more with default). Point of this is check your settings, it may make a difference.
  • What you were seeing with Send/Receive for larger messages was actually a deadlock. I saw it too for the message size shown in this benchmark. In case you missed those, there are two worth it SO posts on it: buffering explanation and a word on deadlocking.

In summary, adjust this benchmark to represent your code more closely and run it on your system, but collective communication in an all-to-all or one-to-all situations should be faster because of dedicated optimizations such as superior algorithms used for communication arrangement. A 2-5 times speedup is considerable, since communication often contributes to the overall time the most.