Parallel programming in Python¶

Python provides some features included in Python and external libraries for parallel programming.

We will look today into high level features to write parallel code:

  1. Asynchronous programming
  2. Job lib
  3. Message passing interface

Asynchronous programming¶

In this example we compute the sum $\sum_i^n v_i$ over a vector $V=\{vi | i,\ldots,n\}$ using multiple cores of our computer. We use the divide and conquer approach and split the computation of the sum in smaller problems:

  • $ sum_ 1 = \sum_i^{n/2} v_i $
  • $ sum_2 = \sum_{n/2+1}^{n} v_i$
  • $ sum = sum_1 + sum_2$
In [31]:
values = [1,2,3,4,5,6,7,8,9];

Define a function to compute the partial sum¶

In [34]:
def part_sum(values,begin,end):
    sum = 0
    for i in range(int(begin),int(end)):
        sum += values[i]
    return sum

Launch concurrent tasks two compute the partial sums¶

In [33]:
import concurrent.futures as f

executor = f.ThreadPoolExecutor(max_workers=4)

f1 = executor.submit(part_sum,values,0,len(values)/2)
f2 = executor.submit(part_sum,values,len(values)/2,len(values))

result = f1.result() + f2.result()
print(result)
45

Check the result using a serial sum¶

In [36]:
sum=0
for value in values:
    sum += value
print(sum)
45

Lightweight pipelining with Python functions (joblib)¶

joblib is an external library for parallel computations in Python.

The library can be installed via pip

pip3 install joblib

Here, we look into the assembly of the stiffness matrix of the finite difference example of the Python example lecture.

In [45]:
from joblib import Parallel, delayed
import numpy as np

Assemble the stiffness matrix using finite differences¶

  1. Dirichlet boundary condition at $x=0$: $$ u_1 = 0. $$
  2. Finite difference schemes for In $\overline{\Omega}$: $\forall i=2,\ldots,n-1$:
$$ - E \frac{u_{i-1}-2u_i+u_{i+1}}{h^2} = f_b(x_k). $$
  1. Neumann boundary condition at $x=1$:
$$ E \frac{u_{n-3}-4u_{n-2}+3u_n-1}{2h} = g. $$

For simplicity we assume $E=1$.

In [41]:
def assing(i):
    matrix[i,i-1] = -2;
    matrix[i,i] = 4;
    matrix[i,i+1] = -2;
    
In [51]:
n = 10
matrix = np.zeros([n, n])
h = 0.1

matrix[0,0] = 1


Parallel(n_jobs=2,prefer="threads")(delayed(assing)(i) for i in range(1,len(matrix)-1))

matrix[n-1,n-1] = 3*h;
matrix[n-1,n-2] = -4*h;    
matrix[n-1,n-3] = h;

matrix *= 1./(2*h*h);

Distributed programming in Python using MPI¶

For distributed programming a very common tool is the Message Passing Interface (MPI).

We install the Python package on the cluster as followed:

module load openmpi
pip3 install mpi4py

MPI is a C library but there is some interface for Python ``mpi4py.

Distributed hello world¶

In [2]:
from mpi4py import MPI
import sys
In [3]:
def print_hello(rank, size, name):
     msg = "Hello World! I am process {0} of {1} on {2}.\n"
     sys.stdout.write(msg.format(rank, size, name))
In [4]:
if __name__ == "__main__":
     size = MPI.COMM_WORLD.Get_size()
     rank = MPI.COMM_WORLD.Get_rank()
     name = MPI.Get_processor_name()
   
     print_hello(rank, size, name)
Hello World! I am process 0 of 1 on Patricks-Air.attlocal.net.

Compute the sum on multiple compute nodes¶

In [6]:
values = [1,2,3,4,5,6,7,8,9]
In [7]:
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
comm = MPI.COMM_WORLD

Partitioning of the data¶

In [9]:
partition = round(len(values)/size)

begin = rank * partition
end = -1
if rank == size-1:
    end = len(values)
else:
    end = (rank+1) * partition

Computation on each node¶

In [10]:
sum = 0

for i in range(int(begin),int(end)):
    sum += values[i]

Sending the partial sum to rank 0¶

In [11]:
if rank != 0 :
      req = comm.isend(sum,dest=0,tag=rank)
      req.wait()
else:
      total_sum = 0
      data = []
      for i in range(1,size):
          data.append(comm.irecv(source=i,tag=i))
      for d in data: 
          total_sum += d.wait()
      total_sum += sum
      print(total_sum)
45