1
votes

Ok, so I want to do a multi-threaded depth-first search in a tree-like structure. I'm using threads from multiple computers in a cluster for this (localhost quad-core and raspberry pi 2 for this example). The master thread should start the process and at the first split in the tree, for each node that it splits into, it should spawn a new thread. These threads should then be able to report their findings back to the master.

I'm trying to do this dynamically instead of providing the mpiexec with a number of threads because I don't know what the tree is going to look like beforehand (e.g. there could be 2 or 9 splits).

I made a sample from the project I'm working on for this question and I have it working as follows. It takes one digit from a string of digits and for each digit it spawns a thread and sends the digit to that thread.

For the master:

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time

################ Set up MPI variables ################

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()

################ Master code ################

script = 'cpi.py'
for d in '34':
   try:
       print 'Trying to spawn child process...'
       icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
       spawnrank = icomm.Get_rank()
       icomm.send(d, dest=spawnrank, tag=11)
       print 'Spawned rank %d.' % spawnrank    
   except: ValueError('Spawn failed to start.')

solved = False
while solved == False:
    #while not comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG):
    #    print 'spawns doing some work...'
    #    time.sleep(1)
solved = comm.recv(source=MPI.ANY_SOURCE, tag=22)
print 'received solution: %d' % solved

It correctly spawns the workers, they receive the digit but don't send it back to the master. The code for the workers is the following:

Workers

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy

################ Set up MPI variables ################

icomm = MPI.Comm.Get_parent()
comm = MPI.COMM_WORLD
irank = comm.Get_rank()
rank = comm.Get_rank()

running = True
while running:
    data = None
    data = icomm.recv(source=0, tag=11)
    if data:
        print 'Trying to send %s from worker rank %d to %d' % (data, rank, irank)
        icomm.send(data, dest=0, tag=22)
        break
print 'Worker on rank %d done.' % rank
icomm.Disconnect()

It never reaches the last line of the master code. I also added (commented out) a probe at the master code to check if a message with tag 22 is hanging around somewhere, ruling out an error in the recv function, but the probe never finds the message. So I assume it is never sent.

I figured by printing ranks of both processes that they are both using rank 0 which makes sense since they are spawned on the same computer. But then when I add a hostfile and rankfile, trying to force it to use a different computer for the slaves, it gives me the following error:

[hch-K55A:06917] *** Process received signal ***
[hch-K55A:06917] Signal: Segmentation fault (11)
[hch-K55A:06917] Signal code: Address not mapped (1)
[hch-K55A:06917] Failing at address: 0x3c
[hch-K55A:06917] [ 0] /lib/x86_64-linux-gnu/libpthread.so.0(+0x10340) [0x7f2c0d864340]
[hch-K55A:06917] [ 1] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(orte_rmaps_rank_file_lex+0x4a0) [0x7f2c0abdcb70]
[hch-K55A:06917] [ 2] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(+0x23ac) [0x7f2c0abda3ac]
[hch-K55A:06917] [ 3] /usr/lib/libopen-rte.so.4(orte_rmaps_base_map_job+0x2e) [0x7f2c0dacd05e]
[hch-K55A:06917] [ 4] /usr/lib/libopen-rte.so.4(orte_plm_base_setup_job+0x5a) [0x7f2c0dac580a]
[hch-K55A:06917] [ 5] /usr/lib/openmpi/lib/openmpi/mca_plm_rsh.so(orte_plm_rsh_launch+0x338) [0x7f2c0b80a8c8]
[hch-K55A:06917] [ 6] /usr/lib/libopen-rte.so.4(+0x51ff4) [0x7f2c0dac3ff4]
[hch-K55A:06917] [ 7] /usr/lib/libopen-rte.so.4(opal_event_base_loop+0x31e) [0x7f2c0dae9cfe]
[hch-K55A:06917] [ 8] mpiexec() [0x4047d3]
[hch-K55A:06917] [ 9] mpiexec() [0x40347d]
[hch-K55A:06917] [10] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5) [0x7f2c0d4b0ec5]
[hch-K55A:06917] [11] mpiexec() [0x403399]
[hch-K55A:06917] *** End of error message ***
Segmentation fault (core dumped)

The command used: mpiexec -np 1 --hostfile hostfile --rankfile rankfile python spawntest.py

Hostfile: localhost localhost slots=1 max-slots=4 pi2@raspi2 slots=4

Rankfile: rank 0=localhost slot=1 rank 1=pi2@raspi2 slot=1-4

So my question is as follows; How can I spawn these threads on a computer other than the master while being able to send data back and forth?

1

1 Answers

4
votes

Your master's code is very wrong and I get the feeling that you are lacking some conceptual understanding of what's going on there.

MPI processes in jobs spawned by MPI_COMM_SPAWN (or its mpi4py counterpart comm.Spawn()) do not become part of the parent's MPI_COMM_WORLD. Spawned processes form an entirely separate world communicator and get interlinked with the parent job via an intercommunicator, which is exactly what the spawn returns. In your case, icomm = MPI.COMM_SELF.Spawn(...) is the intercommunicator handle in the master processes. The processes in the child job obtain the intercommunicator handle using MPI_COMM_GET_PARENT (MPI.Comm.Get_parent() in mpi4py). Since you are spawning single-process jobs:

MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
                                                   ^^^^^^^^^^

there is only one process in the newly formed world communicator of the child job and therefore MPI.COMM_WORLD.Get_rank() returns zero in each worker.

This part of your master's code is wrong but it still functions due to how intercommunicators actually work:

spawnrank = icomm.Get_rank() # <--- not what you expect
icomm.send(d, dest=spawnrank, tag=11)

Intercommunicators link two separate groups of processes. One of them is called the local group and the other one is called the remote group. When using MPI_COMM_RANK (comm.Get_rank()) on an intercommunicator, you obtain the rank of the calling process in the local group. When sending or receiving though, the rank specified relates to the remote group. In your case, spawning a new worker results in the following intercommunicator:

    mastet's MPI_COMM_SELF           child's MPI_COMM_WORLD
              |                                |
+=============|================================|=============+
|  +----------V----------+       +-------------V----------+  |
|  | group of the master |       | group of the child job |  |
|  |        [ 0 ]        |       |          [ 0 ]         |  |
|  +---------------------+       +------------------------+  |
|                    intercommunicator                       |
+============================================================+

(the communicators above show where each group comes from; the communicators themselves are not part of the intercommunicator)

Which group is local and which is remote depends on which group the calling process belongs to. The local group of the master process is the remote group for the ranks in the child job and vice versa. What is important here is, each group has rank 0 as there is at least one process in a group. You are just lucky that the master group has a single process in it and therefore icomm.Get_rank() returns 0 (and it will always return zero since the master's local group is derived from MPI_COMM_SELF, which always contains a single process), which happens to (always) be a valid rank in the remote (child) group. The correct thing to do is to send the message to a fixed rank that you know exists in the remote group, for example rank 0:

   icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
   icomm.send(d, dest=0, tag=11)

(this code explicitly sends to rank 0 of the remote group, while before that the value 0 was just a lucky coincidence)

That said, the sending part - though incorrect - still works. The receiving part doesn't and there are several reasons. First, you are using the wrong communicator - receiving from MPI_COMM_WORLD does not work since the child processes are not members of it. As a matter of fact, communicators in MPI are immutable - you cannot add or remove ranks without creating a new communicator. You should use icomm to receive from the workers, the same way you are using it to send to them. Now, there comes the second problem - icomm in the master gets overwritten by each new Spawn and therefore you are effectively losing the ability to communicate with any child job but the last one. You need to keep a list of handles and append the handle to it.

The receive part is a bit more complex. There is no MPI_ANY_COMM - you cannot have a receive operation that will cover all child jobs as they all live in their separate intercommunicators. You should either loop with MPI_IPROBE over the list of intercommunicators or (better) start a non-blocking receive from each child and then use MPI_WAIT_SOME (whatever the mpi4py equivalent is).

With a loop, the master code should look something like this (note - untested code, I don't have and/or use mpi4py):

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time

################ Set up MPI variables ################

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()

################ Master code ################

icomms = []
script = 'cpi.py'
for d in '34':
   try:
       print 'Trying to spawn child process...'
       icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
       icomm.send(d, dest=0, tag=11)
       icomms.append(icomm)
       print 'Spawned a child.'
   except: ValueError('Spawn failed to start.')

solved = False
while not solved and icomms:
    for icomm in icomms:
        if icomm.Iprobe(source=0, tag=MPI.ANY_TAG):
            print 'A child responded...'
            solved = icomm.recv(source=0, tag=MPI.ANY_TAG)
            icomm.Disconnect()
            icomms.remove(icomm)
            if solved: break
    if not solved:
        print 'spawns doing some work...'
        time.sleep(1)
# make sure all pending sends get matched
for icomm in icomms:
    icomm.recv(source=0, tag=MPI.ANY_TAG)
    icomm.Disconnect()
print 'received solution: %d' % solved

I hope you get the idea.

Addition: if you spawn a job from within a spawned job, the new child cannot easily establish connection to the top-level master. For that, you should turn to an obscure part of the MPI-2 client/server model support and have the master open a port with MPI_PORT_OPEN, then register it with an MPI naming service using MPI_PUBLISH_NAME, and finally use MPI_COMM_ACCEPT to receive connections from any other MPI job. The workers should use MPI_LOOKUP_NAME to obtain a reference to the port and use MPI_COMM_CONNECT to establish an intercommunicator with the master job. I have no idea whether wrappers for those functions exist in mpi4py and if so, how are those named.