1
votes

I am learning MPI-2 and trying to implement the first simple functionality using MPI-2 one-sided-communication:

Have process 0 host one array of fixed size data_size.

Each process (including 0) will generate an array and compare with the host array:

If the first element of the generated array is smaller than that of the host array, replace the host array with the generated one.

In code:

vector<int> v1 = {rank,rank+1,rank+2};
v = get_vec(vec);
if (v1[0] < v[0])
    put_vec(vec,v1);

The complete code is at the bottom. I expect, of course, that out of all generated arrays, the one with the smallest head element should be in the host array finally when program finishes, because the smallest array ([0,1,2] in this example) will replace others and will not be replaced itself.

However, in some (rare) occasions, I got output like this:

$ mpiexec.exe -n 4 a.exe
#0 assigns v1 {0 ...} to host v {2 ...}
#1 assigns v1 {1 ...} to host v {2 ...}
 1  2  3

, which seems to show that two assignments are made at the same time to the host data. I guess I must have misunderstood the lock/unlock synchronization directives in get_vec/putvec or made some obvious mistakes elsewhere.

Can some one please explain how should I fixed my code to get the expected output?

Thanks in advance.


Complete Code compiled using g++ -std=c++11 test.cpp -lmpi:

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <thread>
#include <chrono>

#include <iostream>
#include <vector>

using namespace std;


struct mpi_vector_t {
  MPI_Win win;
  int  hostrank;  //id of the process that host values to be exposed to all processes
  int  rank;    //process id
  int  size;     //number of processes
  int  *data;
  int  data_size;
};

struct mpi_vector_t *create_vec(int hostrank, std::vector<int> v) {
    struct mpi_vector_t *vec;

    vec = (struct mpi_vector_t *)malloc(sizeof(struct mpi_vector_t));
    vec->hostrank = hostrank;
    vec->data_size = v.size();
    MPI_Comm_rank(MPI_COMM_WORLD, &(vec->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(vec->size));

    if (vec->rank == hostrank) {
        MPI_Alloc_mem(vec->data_size * sizeof(int), MPI_INFO_NULL, &(vec->data));
        for (int i=0; i<vec->size; i++) vec->data[i] = v[i];
        MPI_Win_create(vec->data, vec->data_size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(vec->win));
    }
    else {
        vec->data = NULL;
        vec->data_size = v.size();
        MPI_Win_create(vec->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(vec->win));
    }
    return vec;
}

void delete_vec(struct mpi_vector_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

std::vector<int> get_vec(struct mpi_vector_t *vec) {
    vector<int> ret(vec->data_size);
    MPI_Win_lock(MPI_LOCK_SHARED, vec->hostrank, 0, vec->win);
    MPI_Get(&ret.front(), vec->data_size, MPI_INT, vec->hostrank, 0, vec->data_size, MPI_INT, vec->win);
    MPI_Win_unlock(0, vec->win);
    return ret;
}

void put_vec(struct mpi_vector_t *vec, std::vector<int> v) {
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, vec->hostrank, 0, vec->win);
    MPI_Put(&v.front(), vec->data_size, MPI_INT, vec->hostrank, 0, vec->data_size, MPI_INT, vec->win);
    MPI_Win_unlock(0, vec->win);
}

void print_vec(struct mpi_vector_t *vec) {
    if (vec->rank == vec->hostrank) {
        for (int i=0; i<vec->data_size; i++) {
            printf("%2d ", vec->data[i]);
        }
        puts("");
    }
}


int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    struct mpi_vector_t *vec;
    int rank;

    vector<int> v = {2,3,1};
    vec = create_vec(0, v);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    for (int itest = 0; itest < 2; itest++) {
        vector<int> v1 = { rank, rank + 1, rank + 2 }; //some generated data
        v = get_vec(vec);
        if (v1[0] < v[0]) {
            cout << "#" << rank << " assigns v1 {" << v1[0] <<
                    " ...} to host v {" << v[0] << " ...}" << endl;
            put_vec(vec, v1);
        }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_vec(vec);
    delete_vec(&vec);

    MPI_Finalize();
    return 0;
}
1

1 Answers

2
votes

This is a classical data race scenario. Both get_vec and put_vec lock the window individually, but you actually need is a lock that spans the whole code block, i.e.:

lock_window();
v = get_vec(vec);
if (v1[0] < v[0])
   put_vec(vec,v1);
unlock_window();

As it currently stands, it is possible that the content of the shared vector could change right after the call to get_vec() because another process has executed put_vec() and that could invalidate the result of the comparison. Something like this:

std::vector<int> compare_swap_vec(struct mpi_vector_t *vec, std::vector v) {
    vector<int> ret(vec->data_size);
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, vec->hostrank, 0, vec->win_ext);
    ret = get_vec(vec);
    if (v[0] < ret[0])
        put_vec(vec, v);
    MPI_Win_unlock(0, vec->win_ext);
    return ret;
}

The function compare_swap_vec() takes a vector and uses it to replace the old content of the shared vector if the less-than relation holds. It also returns the previous content of the vector. vec->win_ext is another window hosted by the same process that hosts the vector content. It is used for the outer lock since the MPI standard requires that distinct access epochs for the same window at the same process must be disjoint, which I interpret as that nested locks on the same window are not allowed.