Tap into the power of MPI to run distributed Python code on your laptop at scale.

mpi4py – High-Performance Distributed Python

Python, I think it’s safe to say, is the most prevalent language for machine learning and deep learning and has become a popular language for technical computation, as well. Part of the reason for its increasing popularity is the availability of common high-performance computing (HPC) tools and libraries. One of these libraries is mpi4py. If you are familiar with the prevalent naming schemes in Python, you can likely figure out that this is a Message Passing Interface (MPI) library for Python.

MPI is the key library and protocol for writing parallel applications that expand beyond a single system. At first, it was primarily a tool for HPC in the early 1990s, with version 1.0 released in June 1994. In the early days of MPI, it was used almost exclusively with C/C++ and Fortran. Over time, versions of MPI or MPI bindings were released for other languages, such as Java, C#, Matlab, OCaml, R, and, of course, Python. These bindings were created because MPI became the standard for exchanging data between processes on shared or distributed architectures in the HPC world.

With the rise of machine learning, as well as Python in general, developing standard computational tools for Python seemed logical. MPI for Python (mpi4py) was developed for Python with the C++ bindings in the MPI-2 standard. The 1.0 release was on March 20, 2020, and the current release as of this writing is 3.0.3 (July 27, 2020). Mpi4py is designed to be fairly Pythonic, so if you know Python and understand the basic concepts of MPI, then mpi4py shouldn’t be a problem.

Python data structures can be used to create more complex data structures. Because it’s an object-oriented language, you can create classes and objects with these data structures. Libraries such as NumPy can create new data types (e.g., N-dimensional arrays). To pass these complex data structures and objects between MPI ranks, Python serializes the data structures with the pickle standard library module, supporting both ASCII and binary formats. The marshal module also can be used to serialize built-in Python objects into a binary format that is specific to Python.

Mpi4py takes advantage of Python features to maximize the performance of serializing and de-serializing objects as part of data transmission. These efforts have been taken to the point that the mpi4py documentation claims it “… enables the implementation of many algorithms involving multidimensional numeric arrays (e.g., image processing, fast Fourier transforms, finite difference schemes on structured Cartesian grids) directly in Python, with negligible overhead, and almost as fast as compiled Fortran, C, or C++ code.”

Simple mpi4py Examples

For the example programs, I used the mpi4py install for Anaconda and built it with MPICH2. Running mpi4py Python codes follows the MPICH2 standards. For example, a command might be:

$ mpirun -np 4 -f ./hosts python3 ./script.py

where script.py is the Python code that uses mpi4py.

The first example is just a simple “hello world” (Listing 1) from Jörg Bornschein’s GitHub page. The MPI.COMM_WORLD variable (line 3) is how the world communicator is defined in mpi4py to access a global “communicator,” which is a default group of all processes (i.e., collective function). It is created when the mpi4py module is imported. The import also causes the MPI.Init() and MPI.init_thread() functions to be called.

Listing 1: Hello World

from mpi4py import MPI
 
comm = MPI.COMM_WORLD
 
print("Hello! I'm rank %d from %d running in total..." % (comm.rank, comm.size))
 
comm.Barrier()   # wait for everybody to synchronize _here_

Contrast this with C or Fortran the mpi_init() is explicitly called and the world communicator, MPI_COMM_WORLD is defined. Note the difference between MPI.COMM_WORLD for mpi4py and MPI_COMM_WORLD for C and Fortran.

Line 3 in the sample code sets the MPI.COMM_WORLD communicator to the variable comm. The code then uses the rank of the specific process, comm.rank, and the total number of processes, comm.size. The rank of a specific process is very similar to C, MPI_Comm_rank, and Fortran, MPI_COMM_RANK. Although not exactly the same, if you understand the basics of MPI, it won’t be difficult to use mpi4py.

The code ends by calling the Barrier() function. Although not strictly necessary, it’s a nice way of ending the code to make sure all of the processes reach that point.

To run the code, I used:

$ mpirun -np 4 -f ./hosts python3 ./hello-world.py

The output is shown in Listing 2.

Listing 2: Hello World Output

Hello! I'm rank 0 from 4 running in total...
Hello! I'm rank 1 from 4 running in total...
Hello! I'm rank 2 from 4 running in total...
Hello! I'm rank 3 from 4 running in total...

A second example is just a simple broadcast from Ahmed Azridev’s GitHub page (Listing 3). (Note: I cosmetically modified the code to appeal to my sensibilities and to update it to Python 3.) The code first defines a dictionary, data, but only for the rank 0 process. All other processes define data with the keyword None.

Listing 3: Broadcast Example

rom numpy import array
from mpi4py import MPI
 
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
 
if rank == 0:
    data = { 'key1' : [10,10.1,10+11j],
             'key2' : ('mpi4py' , 'python'),
             'key3' : array([1, 2, 3]) }
else:
    data = None
# end if
 
data = comm.bcast(data, root=0)
 
if rank == 0:
    print("bcast finished")
# end if
 
print("data on rank %d is: "%comm.rank, data)

The data is broadcast with the bcast function, which is part of communicator comm, which happens to be MPI.COMM_WORLD. In Fortran and C, the function is usually named MPI_Bcast or MPI_BCAST. For the specific function, the content of variable data is broadcast to all ranks, from the rank 0 process (root=0). Again, all of this is very similar to C and Fortran, but not exactly the same. Listing 4 shows the output from the code.

Listing 4: Broadcast Output

bcast finished
data on rank 0 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
data on rank 1 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
data on rank 2 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}
data on rank 3 is:  {'key1': [10, 10.1, (10+11j)], 'key2': ('mpi4py', 'python'), 'key3': array([1, 2, 3])}

Notice that the data is defined on the rank 0 process, but each rank printed the same data. Perhaps more importantly, the data is a dictionary, not something that is defined by default in Fortran or C.

The third example is point-to-point code (Listing 5). Again, I modified the code to fit my sensibilities and to port to Python 3. I also changed the pprint() functions to print(). The code is pretty simple. The rank 0 process creates some data in the NumPy data array and sends the first part of that array to the rank **1 process and the second part of the array to the rank 2 process. To make even more sure the data goes to the correct process, it uses tags for the send() and recv() functions (i.e., tag=13 and tag=14) to distinguish the destinations (dest) and make sure the sender and receiver processes are matched correctly. It’s not the same as MPI_Send and MPI_Recv, but it’s fairly easy to figure out if you know some MPI. The output when running the code on four processes is shown in Listing 6. Notice that processes 3 and 4 didn’t contribute or do anything.

Listing 5: Point-to-Point

import numpy
from mpi4py import MPI
 
 
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 
if (rank == 0):
    data = numpy.arange(10)
    comm.send(data[:5], dest=1, tag=13)
    comm.send(data[5:], dest=2, tag=14)
    print("Rank %d data is: "%rank, data)
elif (rank == 1):
    data = comm.recv(source=0, tag=13)
    print("Rank %d Message Received, data is: "%rank, data)
elif (rank == 2):
    data = comm.recv(source=0, tag=14)
    print("Rank %d Message Received, data is: "%rank, data)
# end if

Listing 6: Point-to-Point Output

output:
Rank 0 data is:  [0 1 2 3 4 5 6 7 8 9]
Rank 1 Message Received, data is:  [0 1 2 3 4]
Rank 2 Message Received, data is:  [5 6 7 8 9]

More Complex Examples

From these three simple examples, you’ve learned that mpi4py works exactly as MPI code written in Fortran or C with a Python twist. Mpi4py can work with Python data types by serializing and de-serializing them. The next step is to try some more complicated examples.

The first example (Listing 7) is simple parallel trapezoid integration code derived from a presentation by Pawel Pomorski. The code is fairly simple. If you remember how to do numerical integration by the trapezoidal approach, this code should make sense.

Listing 7: Parallel Trapezoid Method

from mpi4py import MPI
import math
 
 
def f(x):
  return x*x
# end def
 
 
def trapezoidal(a, b, n, h):
 
  s = 0.0
  s += h * f(a)
  for i in range(1, n):
    s += 2.0 * h * f(a + i*h)
  # end for
  s += h * f(b)
  return (s/2.)
# end def
 
 
# Main section
comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
 
p = comm.Get_size()
 
if (my_rank == 0):
  print("Number of ranks = ",p)
# end if
print('my_rank = ',my_rank)
 
 
# Integral parameters
a=0.0        # Left endpoint
b=2.0        # Right endpoint
n=1024       # Number of trapezoids
dest=0       # Destination for messages (rank 0 process)
total=-1.0   # Initialize to negative number
 
h = (b-a)/n         #  h = Trapezoid Base Length - the same for all processes
local_n = int(n/p)  #  Number of trapezoids for each process
 
#   Length of each process' interval of
#   integration = local_n*h.
local_a = a + my_rank*local_n*h   # local a (specific to each process)
local_b = local_a + local_n*h     # local b (specific to each process)
 
# Each rank performs its own integration
integral = trapezoidal(local_a, local_b, local_n, h)
 
# Add up the integrals calculated by each process
if (my_rank == 0):
  total=integral
  for source in range(1,p):
    integral=comm.recv(source=source)
    print("PE ",my_rank,"<-",source,",",integral,"\n")
    total=total+integral
  # end for
else:
  print("PE ",my_rank,"->",dest,",",integral,"\n")
  comm.send(integral, dest=0)
# end if
 
 
#  Print the result
if (my_rank==0):
  print("**With n=",n,", trapezoids,")
  print("** Final integral from",a,"to",b,"=",total,"\n")
# end if
 
MPI.Finalize()

To begin, the code breaks up the integration beginning and end points into n trapezoids, where n is also the number of processes. Then the number of intervals are divided across the number of processes, and each process performs its own integration. When finished, each process sends its result to the rank 0 process, which adds the contribution of each process to get the final answer.

The command to run the code is straightforward:

$ mpirun -n 4 -f ./hosts python3 trap-mpi4py.py

Another sample problem (Listing 8) integrates x^2 over the interval from 0.0 to 2.0. The output contains information about what each process is doing, showing when a process sends data to the rank 0 process (e.g., PE 1 -> 0). The rank 0 process shows which process sent the data (e.g., PE 0 <- 1).

Listing 8: Integration Example

Number of ranks =  4
my_rank =  0
PE  0 <- 1 , 0.29166698455810547
 
PE  0 <- 2 , 0.7916669845581055
 
PE  0 <- 3 , 1.5416669845581055
 
**With n= 1024 , trapezoids,
** Final integral from 0.0 to 2.0 = 2.666667938232422
 
my_rank =  1
PE  1 -> 0 , 0.29166698455810547
 
my_rank =  3
PE  3 -> 0 , 1.5416669845581055
 
my_rank =  2
PE  2 -> 0 , 0.7916669845581055

The next example is a simple matrix-matrix multiply that breaks the matrices into a grid of blocks. It then handles communication between the blocks in a classic east, west, north, south layout. I won’t put the code here because it is a little long. I only changed the function calls to pprint from print to get it to work. The program uses four processes, and the output is just the timings of the code run (Listing 9).

Listing 9: Matrix-Matrix Product Output

Creating a 2 x 2 processor grid...
==============================================================================
Computed (serial) 3000 x 3000 x 3000 in    1.68 seconds
 ... expecting parallel computation to take   3.36 seconds
Computed (parallel) 6000 x 6000 x 6000 in          3.69 seconds

The final example I tried, just for fun, is a parallel implementation of Conway’s Game of Life that I got from Kevin Moore’s GitHub site. Again, I won’t show the lengthy code here. Because it’s old Python 2.x style, I had to fix the printfunctions.

If you’ve never played Game of Life, give it a try. There is really nothing to do: just watch the screen. The grid is initialized randomly. Because the output is animated, I can’t show anything here, but give it a try and be sure to take a look at the code.

Summary

Mpi4py is a very powerful Python tool that uses wrappers around standard MPI implementations such as MPICH2 or OpenMPI. If you have used MPI before, either in Fortran or C, then using it in Python is not difficult. It holds true to Python’s object-oriented nature but still uses the familiar MPI terminology.

In this article, I showed a few simple programs, from Hello World, to broadcast, to point-to-point examples, which all illustrate that coding MPI is not difficult in Python.

The final few “real” examples were more than just instructional: The simple trapezoidal integration example shows how easy it is to parallelize code; the matrix-matrix multiplication example breaks matrices into a grid of blocks (sometimes called tiles) and is a bit more complicated, because you need to know how to do parallel matrix multiplication; a parallel version of Conway’s Game of Life is a diverting visual example, with a bit more difficult code. You probably need to know something about this algorithm to better understand the parallel code, but it’s still fun to run, and you can always change the initialization from random to something more deliberate.

The Author

Jeff Layton has been in the HPC business for almost 25 years (starting when he was four years old). He can be found lounging around at a nearby Frys enjoying the coffee and waiting for sales.