An Introduction to Parallel Computing with MPI for python

Computer Science Department - University of Puerto Rico

Prepared by: José R. Ortiz-Ubarri

MPI (Message Passing Interface)

MPI is a standard for expressing distributed parallelism via message passing.

It consists of a library of routings that provides the environment for parallelism.

MPI can be used in Fortran, C, C++, and of course in python

mpi4py: MPI for python

MPI for python provides bindings of the Message Passing Interface (MPI) standard for the Python programming language.

Find it here: https://code.google.com/p/mpi4py/

and the API documentation here: http://mpi4py.scipy.org/docs/apiref/index.html

mpi4py: Important Objects

The main object is MPI

MPI has two important members

mpi4py: Important Routines

mpi4py: Important Routines continued

MPI C Simple Program template


#include "mpi.h"

int main (int argc, char* argv[]) {
  int my_rank, num_procs, mpi_error_code;

  // other declarations

  mpi_error_code = MPI_Init(&argc, &argv);  /* Start up MPI */
  mpi_error_code = MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
  mpi_error_code = MPI_Comm_size(MPI_COMM_WORLD, &num_procs);

  // ***************************************************************
  // actual (serial) work goes here
  // ***************************************************************

  mpi_error_code = MPI_Finalize(); /* Shut down MPI */ 
}

mpi4py Simple Program template


from mpi4py import MPI

comm = MPI.COMM_WORLD   # Defines the default communicator
num_procs = comm.Get_size()  # Stores the number of processes in size.
rank = comm.Get_rank()  # Stores the rank (pid) of the current process

# ***************************************************************
# actual (serial) work goes here
# ***************************************************************




helloworld.py: Hello world from all process


from mpi4py import MPI

comm = MPI.COMM_WORLD   # Defines the default communicator
num_procs = comm.Get_size()  # Stores the number of processes in num_procs.
rank = comm.Get_rank()  # Stores the rank (pid) of the current process

print "Hello world, say process %s !" % rank




helloworldMW.py Simple Master/worker example


from mpi4py import MPI

comm = MPI.COMM_WORLD   # Defines the default communicator
num_procs = comm.Get_size()  # Stores the number of processes in size.
rank = comm.Get_rank()  # Stores the rank (pid) of the current process
stat = MPI.Status()

msg =  "Hello world, say process %s !" % rank

if rank == 0:
    # Master work
    print msg
    for i in range(num_procs - 1):
        msg = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=stat)
        print msg
    
else:
    # Worker work
    comm.send(msg, dest = 0)


Simple Master/worker example C


int main (int argc, char* argv[]){

  const int max_msg_length = 100;
  char message[max_msg_length+1];
  MPI_Status status ;
  int rank, num_procs ;
  int tag ;
  int mpi_error ;

  mpi_error = MPI_Init(&argc, &argv); 
  mpi_error = MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
  mpi_error = MPI_Comm_size(MPI_COMM_WORLD, &num_procs);

  sprintf(message, "Hello world, say process #%d!", rank);  

  // Master worker code in next slide




Simple Master/worker example C continued


  if(rank == 0){
    printf(message) ;
    for(int i = 0; i < num_procs-1; i++){
        mpi_error = MPI_Recv(message, max_msg_length + 1,
        MPI_CHAR, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD,
            &status);
        printf(message) ;
    }
  }
  else{
    mpi_error_code = MPI_Send(message, strlen(message) + 1, MPI_CHAR, 
        0, tag, MPI_COMM_WORLD);
  }

  mpi_error = MPI_Finalize();
}


Why mpi4py?

Running parallel MPI python programs

Example

Example of running the MPI python script helloworld.py with 4 processes
        $mpirun -np 4 python helloworld.py
        Hello world, say process 0 !
        Hello world, say process 1 !
        Hello world, say process 3 !
        Hello world, say process 2 !
    

Like this:


from mpi4py import MPI

comm = MPI.COMM_WORLD   # Defines the default communicator
num_procs = comm.Get_size()  # Stores the number of processes in size.
rank = comm.Get_rank()  # Stores the rank (pid) of the current process
stat = MPI.Status()

msg =  "Hello world, say process %s !", % rank

if rank == 0:
    # Master work
    print msg
    for i in range(num_procs - 1):
        msg = comm.recv(source=i+1, tag=MPI.ANY_TAG, status=stat)
        print msg
    
else:
    # Worker work
    comm.send(msg, dest = 0)


How is the magic?

Every time mpirun is executed, N_p processes are created, all with the same copy of the code. Single Program, Multiple Data (SPMD)

They all start executing.

The conditional statement that uses the rank of the process will determine the part of the code that each process runs. Everything outside the conditional statements is executed by all the processes.

Each process is indepent from the other, except when communicating.

Tags: What is the message tag?

The message tag is used to indicate the receiver the type of message received.

For example:


    msg = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=stat)
    if stat.Get_tag() == 1:
        function1(msg)
    elif stat.Get_tag() == 2:
        function2(msg)
    else:
        function3(msg)
        

Note that you can specify the type of message you want to receive by specifying the tag of the message.

Broadcast

Useful when one process has data that all the other processes need to know.

For example, we have a program that works with a dna string stored in a file.

One way to share the dna string among the processes is to read the dna string from proccess 0, and then broadcast the string to the other processes.

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
    fd = open("dna_file.txt", "r")
    dna = fd.read()
else:
    dna = None

dna = comm.bcast(dna, root=0) 
print rank, dna

Reduce

Reduce takes all the values sent by each process and performs a reduction operation on them.


from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

val = 3
sum = comm.reduce(val, op=MPI.SUM, root =0)

if rank == 0:
    print "The reduction is %s" % sum
If the previous code is run with 3 processes the output would be:
The reduction is 9.

Reduce continued

In all the other processess the value of sum will be None.

Using the function allreduce instead of reduce will return the reduced value to all the processes.

Reduce continued, max example


from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

max = comm.reduce(rank, op=MPI.MAX, root =0)

if rank == 0:
    print "The reduction is %s" % max
If the previous code is run with 3 processes the output would be:
The reduction is 2.

Because the max of the ranks (0,1,2) is 2.

References

MPI4py, http://mpi4py.scipy.org/

Python, www.python.org

P.S. Pacheco, Parallel Programming with MPI, Morgan Kaufmann Publishers, 1997.