Running hybrid mpi4py and Charm4py programs (mpi interop)

#1

From initial testing, it seems that making hybrid MPI/Charm4py programs already works! :slight_smile:

I have made an example (shown below) that mixes some MPI synchronous collectives with a non-trivial Charm workload. Other things still need testing, like calling mpi4py libraries or doing other MPI operations (e.g. asynchronous communication), but this is very promising.

UPDATE: I have also been able to successfully run code that uses external MPI libraries, and have also run code that uses Isend/Irecv. For the future, we can probably do some things to streamline this scenario a bit.

I have tested this on a Linux machine using OpenMPI. The requirement is to build Charm++ with the MPI layer. Here is an example of how to run the code:

$ git clone https://github.com/UIUC-PPL/charm4py
$ cd charm4py
$ git clone https://github.com/UIUC-PPL/charm charm_src/charm
$ cd charm_src/charm
$ ./build charm4py mpi-linux-x86_64 -j4 --with-production
$ cd ../..
$ python3 setup.py build_ext --inplace
$ export PYTHONPATH=`pwd`
$ mpirun -np 4 /usr/bin/python3 mpi-charm-hybrid.py

Here is the example code. One caveat: this code is using charm.barrier() which is not yet in master branch. This is not needed to make hybrid mpi/charm4py programs, but is very convenient for this particular example. This feature will be merged soon with master.

Hope this helps.

# mpi-charm-hybrid.py
from mpi4py import MPI
from charm4py import charm, Chare, Group, Array, threaded, Reducer
import numpy
import random


PARTICLES_PER_CELL_INIT = 300
CELLS_PER_PE = 4

class Cell(Chare):

    def __init__(self):
        # print('Cell', self.thisIndex, 'created')
        self.numParticles = PARTICLES_PER_CELL_INIT

    @threaded
    def work(self, numCells, done_future):
        self.numCells = numCells
        self.done_future = done_future
        self.msgsReceived = 0
        for _ in range(100):  # do 100 iterations
            # move random number of particles to my 2 neighbors
            a = random.randint(0, self.numParticles)
            b = random.randint(0, self.numParticles - a)
            self.thisProxy[(self.thisIndex[0] - 1) % numCells].recvParticles(a)
            self.thisProxy[(self.thisIndex[0] + 1) % numCells].recvParticles(b)
            self.numParticles -= (a + b)
            # wait for particles from my 2 neighbors
            self.wait('self.msgsReceived == 2')
            self.msgsReceived = 0
        self.contribute(self.numParticles, Reducer.gather,
                        self.thisProxy[0].calculateResults)

    def recvParticles(self, n_particles):
        self.numParticles += n_particles
        self.msgsReceived += 1

    def calculateResults(self, particles_in_cells):
        totalParticles = sum(particles_in_cells)
        minParticlesPerCell = min(particles_in_cells)
        avgParticlesPerCell = totalParticles / self.numCells
        maxParticlesPerCell = max(particles_in_cells)
        # send result to creator of future object
        self.done_future.send((minParticlesPerCell, avgParticlesPerCell,
                              maxParticlesPerCell, totalParticles))


class HybridMPICharmProgram(Chare):

    @threaded
    def start(self):
        # this is the part that resembles an MPI program. but there is MPI and
        # charm code interspersed
        comm = MPI.COMM_WORLD
        myrank = comm.Get_rank()
        numRanks = comm.Get_size()
        print('Hello from MPI rank', myrank, 'numRanks=', numRanks)

        # test barrier
        comm.Barrier()

        # test broadcast
        test_bcast = None
        if myrank == 0:
            test_bcast = [4, 7, 12, 13]
        test_bcast = comm.bcast(test_bcast, root=0)
        assert test_bcast == [4, 7, 12, 13]

        # test reduce
        if myrank == 0:
            result = comm.reduce(myrank, op=MPI.SUM, root=0)
            assert result == numRanks * (numRanks - 1) / 2
        else:
            comm.reduce(myrank, op=MPI.SUM, root=0)

        # test charm workload
        numCells = charm.numPes() * CELLS_PER_PE
        if myrank == 0:
            cells = Array(Cell, numCells)  # create chare array of Cells
            charm.awaitCreation(cells)  # not needed, but good for testing
            f = charm.createFuture()  # create future to wait for result of simulation
            cells.work(numCells, f)
            charm_result = f.get()
            charm.barrier(self)
            comm.bcast(charm_result, root=0)
        else:
            # charm has to do the simulation using all PEs, so we can't have MPI
            # block the process. so we do a charm.barrier() instead to wait until
            # rank 0 receives the result of the simulation. a charm.barrier() only
            # blocks this thread (other chares can continue doing work)
            charm.barrier(self)
            # get result of charm simulation using MPI broadcast
            charm_result = comm.bcast(None, root=0)
            assert int(charm_result[1]) == PARTICLES_PER_CELL_INIT
            assert charm_result[3] == PARTICLES_PER_CELL_INIT * numCells
            print('Rank', myrank, 'charm_result=', charm_result)

        comm.Barrier()
        if myrank == 0:
            exit()


def startMPI(args):
    Group(HybridMPICharmProgram).start()


charm.start(startMPI)
Development Roadmap
#2

Thatโ€™s looking great Juan, thanks for working on this so swiftly. I will try to test it on an example using charm4py and an MPI-based library by the end of this week, beginning of the next.