Lisandro Dalcin Mpi4py
Lisandro Dalcin Mpi4py
Lisandro Dalcin Mpi4py
http://mpi4py.scipy.org
Lisandro Dalcin
dalcinl@gmail.com
Centro Internacional de M
etodos Computacionales en Ingeniera
Consejo Nacional de Investigaciones Cientficas y T
ecnicas
Santa Fe, Argentina
January, 2011
Python for parallel scientific computing
PASI, Valparaso, Chile
Outline
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
What is mpi4py?
Implementation
Features MPI-1
Collective operations.
Synchronization (barrier)
Communication (broadcast, scatter/gather)
Global reductions (reduce, scan)
Features MPI-2
Features Python
120
Throughput [MiB/s]
100
PingPong
Pickle
Buffer
C
80
60
40
20
0
100
101
102
106
107
Throughput [MiB/s]
4500
4000
3500
3000
2500
2000
1500
1000
500
0
100
PingPong
Pickle
Buffer
C
101
102
106
107
Features IPython
Integration with IPython enables MPI to be used interactively.
Start engines with MPI enabled
$ ipcluster mpiexec -n 16 --mpi=mpi4py
Connect to the engines
$ ipython
In [1]: from IPython.kernel import client
In [2]: mec = client.MultiEngineClient()
In [3]: mec.activate()
Execute commands using %px
In [4]: %px from mpi4py import MPI
In [5]: %px print(MPI.Get_processor_name())
Features Interoperability
Hello World!
2
3
4
5
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
name = MPI.Get_processor_name()
6
7
8
9
/* file: helloworld.c */
void sayhello(MPI_Comm comm)
{
int size, rank;
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
printf("Hello, World! "
"I am process "
"%d of %d.\n",
rank, size);
}
// file: helloworld.i
%module helloworld
%{
# include <mpi.h>
# include "helloworld.c"
}%
7
8
9
%include mpi4py/mpi4py.i
%mpi4py_typemap(Comm, MPI_Comm);
10
11
// file: helloworld.cxx
# include <boost / python.hpp>
# include <mpi4py / mpi4py.h>
using namespace boost::python;
5
6
7
8
9
10
11
12
# include "helloworld.c"
static void wrap_sayhello(object py_comm) {
PyObject* py_obj = py_comm.ptr();
MPI_Comm *comm_p = PyMPIComm_Get(py_obj);
if (comm_p == NULL) throw_error_already_set();
sayhello(*comm_p);
}
13
14
15
16
17
BOOST_PYTHON_MODULE(helloworld) {
if (import_mpi4py() < 0) return;
def("sayhello", wrap_sayhello);
}
! file: helloworld.f90
subroutine sayhello(comm)
use mpi
implicit none
integer :: comm, rank, size, ierr
call MPI_Comm_size(comm, size, ierr)
call MPI_Comm_rank(comm, rank, ierr)
print *, Hello, World! I am process ,rank, of ,size,.
end subroutine sayhello
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
Communicators
communicator = process group + communication context
Predefined instances
COMM WORLD
COMM SELF
COMM NULL
Accessors
rank = comm.Get rank() # or comm.rank
size = comm.Get size() # or comm.size
group = comm.Get group()
Constructors
newcomm = comm.Dup()
newcomm = comm.Create(group)
newcomm = comm.Split(color, key)
Communicators Create()
1
2
3
4
comm = MPI.COMM_WORLD
group = comm.Get_group()
5
6
7
newgroup = group.Excl([0])
newcomm = comm.Create(newgroup)
8
9
10
11
12
13
if comm.rank == 0:
assert newcomm == MPI.COMM_NULL
else:
assert newcomm.size == comm.size - 1
assert newcomm.rank == comm.rank - 1
14
15
16
group.Free(); newgroup.Free()
if newcomm: newcomm.Free()
Communicators Split()
1
2
3
4
world_rank = MPI.COMM_WORLD.Get_rank()
world_size = MPI.COMM_WORLD.Get_size()
5
6
7
8
9
10
11
12
13
14
15
Exercise #1
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
Blocking communication
Python objects
comm.send(obj, dest=0, tag=0)
obj = comm.recv(None, src=0, tag=0)
Array data
comm.Send([array, count, datatype], dest=0, tag=0)
comm.Recv([array, count, datatype], src=0, tag=0)
Nonblocking communication
Python objects
request = comm.isend(object, dest=0, tag=0)}
request.Wait()
Array data
req1 = comm.Isend([array, count, datatype], dest=0, tag=0)
req2 = comm.Irecv([array, count, datatype], src=0, tag=0)
MPI.Request.Waitall([req1, req2])}
PingPong
1
2
3
4
5
6
7
8
9
10
11
12
if comm.rank == 0:
sendmsg = 777
comm.send(sendmsg, dest=1, tag=55)
recvmsg = comm.recv(source=1, tag=77)
else:
recvmsg = comm.recv(source=0, tag=55)
sendmsg = "abc"
comm.send(sendmsg, dest=0, tag=77)
PingPing
1
2
3
4
5
6
7
8
9
10
if comm.rank == 0:
sendmsg = 777
target = 1
else:
sendmsg = "abc"
target = 0
11
12
13
14
Exchange
1
2
3
4
5
6
sendmsg = [comm.rank]*3
right = (comm.rank + 1) % comm.size
left = (comm.rank - 1) % comm.size
7
8
9
10
11
req1
req2
lmsg
rmsg
=
=
=
=
comm.isend(sendmsg, dest=right)
comm.isend(sendmsg, dest=left)
comm.recv(source=left)
comm.recv(source=right)
12
13
14
15
MPI.Request.Waitall([req1, req2])
assert lmsg == [left] * 3
assert rmsg == [right] * 3
5
6
7
8
9
10
11
12
13
if comm.rank
array1 =
array2 =
target =
else:
array1 =
array2 =
target =
== 0:
numpy.arange(10000, dtype=f)
numpy.empty(10000, dtype=f)
1
numpy.ones(10000, dtype=f)
numpy.empty(10000, dtype=f)
0
14
15
16
17
Exercise #2
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
processes
data
A
broadcast
A
A
A
A
A
0
0
0
0
0
0
data
processes
scatter
A
A
A
gather
A
A
A
0
1
2
3
4
5
processes
data
A
B
C
D
E
F
allgather
A
A
0
0
0
0
0
0
B
B
B
B
B
B
0
0
0
0
0
0
C
C
C
C
C
C
0
0
0
0
0
0
D
D
D
D
D
D
0
0
0
0
0
0
E
E
E
E
E
E
0
0
0
0
0
0
F
F
F
F
F
F
0
0
0
0
0
0
data
processes
A
B
C
D
E
F
0
0
0
0
0
0
A
B
C
D
E
F
1
1
1
1
1
1
A
B
C
D
E
F
2
2
2
2
2
2
A
B
C
D
E
F
3
3
3
3
3
3
A
B
C
D
E
F
4
4
4
4
4
4
A
B
C
D
E
F
5
5
5
5
5
5
alltoall
A
A
A
A
A
0
1
2
3
4
5
B
B
B
B
B
B
0
1
2
3
4
5
C
C
C
C
C
C
0
1
2
3
4
5
D
D
D
D
D
D
0
1
2
3
4
5
E
E
E
E
E
E
0
1
2
3
4
5
F
F
F
F
F
F
0
1
2
3
4
5
Broadcast
1
2
3
4
5
6
7
if comm.rank == 0:
sendmsg = (7, "abc", [1.0,2+3j], {3:4})
else:
sendmsg = None
8
9
Scatter
1
2
3
4
5
6
7
if comm.rank == 0:
sendmsg = [i**2 for i in range(comm.size)]
else:
sendmsg = None
8
9
1
2
3
4
sendmsg = comm.rank**2
5
6
7
8
recvmsg2 = comm.allgather(sendmsg)
1
2
3
4
sendmsg = comm.rank
5
6
7
8
recvmsg2 = comm.allreduce(sendmsg)
Exercise #3
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
Compute Pi
=
0
1
4
dx
2
1+x
n
n1
i=0
4
2
1 + ( i+0.5
n )
Compute Pi sequential
1
import math
2
3
4
5
6
7
8
9
def compute_pi(n):
h = 1.0 / n
s = 0.0
for i in range(n):
x = h * (i + 0.5)
s += 4.0 / (1.0 + x**2)
return s * h
10
11
12
13
n = 10
pi = compute_pi(n)
error = abs(pi - math.pi)
14
15
16
3
4
5
6
7
8
9
10
11
12
13
14
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()
if myrank == 0:
n = 10
else:
n = None
5
6
n = comm.bcast(n, root=0)
7
8
9
10
11
12
13
14
15
if myrank == 0:
error = abs(pi - math.pi)
print ("pi is approximately %.16f, "
"error is %.16f" % (pi, error))
Exercise #4
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
Mandelbrot Set
9
10
11
12
13
x1, x2
y1, y2
w, h
maxit
=
=
=
=
-2.0, 1.0
-1.0, 1.0
150, 100
127
import numpy
C = numpy.zeros([h, w], dtype=i)
dx = (x2 - x1) / w
dy = (y2 - y1) / h
for i in range(h):
y = y1 + i * dy
for j in range(w):
x = x1 + j * dx
C[i, j] = mandelbrot(x, y, maxit)
10
11
12
13
14
0
1
2
Block distribution
0
0
0
2
Cyclic distribution
9
10
11
12
13
x1, x2
y1, y2
w, h
maxit
=
=
=
=
-2.0, 1.0
-1.0, 1.0
150, 100
127
3
4
5
6
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
dx = (x2 - x1) / w
dy = (y2 - y1) / h
for i in range(N):
y = y1 + (i + start) * dy
for j in range(w):
x = x1 + j * dx
Cl[i, j] = mandelbrot(x, y, maxit)
2
3
4
5
6
7
8
9
rowtype = MPI.INT.Create_contiguous(w)
rowtype.Commit()
10
11
12
13
comm.Gatherv(sendbuf=[Cl, MPI.INT],
recvbuf=[C, (counts, None), rowtype],
root=0)
14
15
rowtype.Free()
if comm.rank == 0:
from matplotlib import pyplot
pyplot.imshow(C, aspect=equal)
pyplot.spectral()
pyplot.show()
Exercise #5
Overview
Communicators
Point to Point Communication
Collective Operations
Compute Pi
Mandelbrot Set
Dynamic Process Management
3
4
5
6
comm = MPI.COMM_SELF.Spawn(sys.executable,
args=[compute_pi-child.py],
maxprocs=5)
7
8
9
10
11
12
13
N = numpy.array(10, i)
comm.Bcast([N, MPI.INT], root=MPI.ROOT)
PI = numpy.array(0.0, d)
comm.Reduce(None, [PI, MPI.DOUBLE],
op=MPI.SUM, root=MPI.ROOT)
comm.Disconnect()
14
15
16
17
3
4
5
6
comm = MPI.Comm.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()
7
8
9
10
11
12
13
14
15
16
N = numpy.array(0, dtype=i)
comm.Bcast([N, MPI.INT], root=0)
h = 1.0 / N; s = 0.0
for i in range(rank, N, size):
x = h * (i + 0.5)
s += 4.0 / (1.0 + x**2)
PI = numpy.array(s * h, dtype=d)
comm.Reduce([PI, MPI.DOUBLE], None,
op=MPI.SUM, root=0)
17
18
comm.Disconnect()
Exercise #5
Thanks!