0
votes

We want to run some background processes in our Django app. It seems like Celery is the most common solution, but our team is more familiar with MPI, so I'm experimenting with it. I'd like to create a Django admin command that launches the MPI pool of workers, so I read up on Django admin commands and MPI4py's dynamic process management.

I wrote an admin command to run the fleet manager and an admin command to run a worker. The fleet manager successfully uses MPI.COMM_SELF.Spawn() to launch the workers, but they can't communicate with each other. The manager and the first worker both have rank 0, so it looks like they are using separate communicators.

How can I get the manager and workers to use the same communicator?

1

1 Answers

0
votes

The trick is to merge the two communicators, as described in this answer to a C question. With some help from the MPI4py documentation, I converted it to Python:

# myproject/myapp/management/commands/runfleet.py
from mpi4py import MPI
from optparse import make_option

from django.core.management.base import BaseCommand

import sys

class Command(BaseCommand):
    help = 'Launches the example manager and workers.'

    option_list = BaseCommand.option_list + (
        make_option('--workers', '-w', type='int', default=1), )

    def handle(self, *args, **options):
        self.stdout.write("Manager launched.")

        worker_count = options['workers']
        manage_script = sys.argv[0]
        comm = MPI.COMM_SELF.Spawn(sys.executable,
                                   args=[manage_script, 'fleetworker'],
                                   maxprocs=worker_count).Merge()
        self.stdout.write('Manager rank {}.'.format(comm.Get_rank()))

        start_data = [None] # First item is sent to manager and ignored
        for i in range(worker_count):
            start_data.append("Item {}".format(i))
        comm.scatter(start_data)

        end_data = comm.gather()
        self.stdout.write('Manager received data {!r}.'.format(end_data))

        comm.Disconnect()

The worker command looks like this:

# myproject/myapp/management/commands/fleetworker.py
from mpi4py import MPI

from django.core.management.base import BaseCommand

class Command(BaseCommand):
    help = 'Example worker process.'

    def handle(self, *args, **options):
        self.stdout.write("Worker launched.")

        comm = MPI.Comm.Get_parent().Merge()
        rank = comm.Get_rank()
        self.stdout.write('Worker rank {}.'.format(rank))

        data = comm.scatter()
        result = "{!r}, {!r}".format(rank, data)

        comm.gather(result)
        self.stdout.write("Finished worker.")

        comm.Disconnect()