在上一篇中我们介绍了 mpi4py 中的广播操作方法,下面将介绍发散操作。
对组内通信子对象,发散操作从组内的根进程分别向组内进程散发不同的消息。
对组间通信子对象,发散操作的函数调用应该包含组间通信子内的所有进程,且其中必须有一个组定义了根进程,数据将从根进程散发给根所处组之外组的所有进程。根进程的 root
参数使用 MPI.ROOT,根进程所处组内的其它进程 root
参数为 MPI.PROC_NULL,所有处于根进程之外组的进程使用相同的 root
参数——根进程在其组内的 rank。
方法接口
mpi4py 中的发散操作的方法(MPI.Comm 类的方法)接口为:
scatter(self, sendobj, int root=0)
Scatter(self, sendbuf, recvbuf, int root=0)
Scatterv(self, sendbuf, recvbuf, int root=0)
以小写字母开头的 scatter 可以散发一系列任意可被 pickle 系列化的 Python 对象 sendobj
,这一系列对象的个数必须等于要接收消息的进程的数目。以大写字母开头的 Scatter 可以发散一个具有单段缓冲区接口的类数组对象,如 numpy 数组,参数 sendbuf
/recvbuf
可以是一个长度为2或3的 list 或 tuple,类似于 [data, MPI.DOUBLE]
,或者 [data, count, MPI.DOUBLE]
,以指明发送/接收数据缓冲区,数据计数以及数据类型。当 count
省略时会利用 data
的字节长度和数据类型计算出对应的 count
。对 numpy 数组,其计数和数据类型可以自动推断出来,因此可以直接以 data
作为参数传给 sendbuf
/recvbuf
。Scatterv 用于向各个进程散发不同长度的消息,它的第二个参数 recvbuf
可以与 Scatter 的对应参数一样设置,但是其第一个参数需要设置成类似于 [data, count, displ, MPI.DOUBLE]
, 其中 count
和 displ
都是一个整数系列,count
指明发送给各个进程的数据个数,displ
指明应该发送给各个进程的数据段在发送数据缓冲区中的起始偏离。
对组内通信子对象的 Scatter 和 Scatterv,可以将其 recvbuf
参数设置成 MPI.IN_PLACE,此时根进程将不再向其自身传递数据,即被散发的发送数据仍将包含 n 个分段(其中 n 是组的 size),对其中编号对应于根进程的分段,在设置了 MPI.IN_PLACE 的情况下不再传递给根进程。
例程
下面给出发散操作的使用例程。
# scatter.py
"""
Demonstrates the usage of scatter, Scatter, Scatterv.
Run this with 4 processes like:
$ mpiexec -n 4 python scatter.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# ------------------------------------------------------------------------------
# scatter a list of generic object by using scatter
if rank == 1:
send_obj = [1.2, 'xxx', {'a': 1}, (2,)]
else:
send_obj = None
# each process receives one of the element of send_obj from rank 1
# rank 0 | rank 1 | rank 2 | rank 3
# ------------+------------+------------+------------
# 1.2 | 'xxx' | {'a': 1} | (2,)
recv_obj = comm.scatter(send_obj, root=1)
print 'scatter: rank %d has %s' % (rank, recv_obj)
# ------------------------------------------------------------------------------
# scatter a numpy array by using Scatter
if rank == 2:
send_buf = np.arange(8, dtype='i')
else:
send_buf = None
recv_buf = np.empty(2, dtype='i')
# each process receives two numbers of send_buf from rank 2
# rank 0 | rank 1 | rank 2 | rank 3
# ------------+------------+------------+------------
# [0, 1] | [2, 3] | [4, 5] | [6. 7]
comm.Scatter(send_buf, recv_buf, root=2)
print 'Scatter: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# scatter a numpy array by using Scatter with MPI.IN_PLACE
if rank == 2:
send_buf = np.arange(8, dtype='i')
else:
send_buf = None
# initialize a receive buffer [-1, -1]
recv_buf = np.zeros(2, dtype='i') - 1
# each process other than the root receives two numbers of send_buf from rank 2
# but the root does not receive message from itself with MPI.IN_PLACE
# rank 0 | rank 1 | rank 2 | rank 3
# ------------+------------+------------+------------
# [0, 1] | [2, 3] | [-1, -1] | [6. 7]
if rank == 2:
comm.Scatter(send_buf, MPI.IN_PLACE, root=2)
else:
comm.Scatter(send_buf, recv_buf, root=2)
print 'Scatter: rank %d has %s with MPI.IN_PLACE' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# scatter a numpy array by using Scatterv
if rank == 2:
send_buf = np.arange(10, 20, dtype='i')
else:
send_buf = None
recv_buf = np.empty(rank+1, dtype='i')
count = [1, 2, 3, 4]
displ = [0, 1, 3, 6]
# scatter 10 numbers from rank 2 to 4 processes with allocation:
# rank 0 | rank 1 | rank 2 | rank 3
# -----------+------------+------------+-------------
# 10 | 11 12 | 13 14 15 | 16 17 18 19
# displ: 0 1 3 6
comm.Scatterv([send_buf, count, displ, MPI.INT], recv_buf, root=2)
print 'Scatterv: rank %d has %s' % (rank, recv_buf)
运行结果如下:
$ mpiexec -n 4 python scatter.py
scatter: rank 2 has {'a': 1}
Scatter: rank 2 has [4 5]
Scatter: rank 2 has [-1 -1] with MPI.IN_PLACE
Scatterv: rank 2 has [13 14 15]
scatter: rank 0 has 1.2
Scatter: rank 0 has [0 1]
Scatter: rank 0 has [0 1] with MPI.IN_PLACE
Scatterv: rank 0 has [10]
scatter: rank 1 has xxx
Scatter: rank 1 has [2 3]
Scatter: rank 1 has [2 3] with MPI.IN_PLACE
Scatterv: rank 1 has [11 12]
scatter: rank 3 has (2,)
Scatter: rank 3 has [6 7]
Scatter: rank 3 has [6 7] with MPI.IN_PLACE
Scatterv: rank 3 has [16 17 18 19]
以上我们介绍了 mpi4py 中的发散操作方法,在下一篇中我们将介绍收集操作。