在上一篇中我们介绍了 mpi4py 中的发散操作方法,下面我们将介绍收集操作。
收集操作是发散操作的逆操作,根进程从其它进程收集不同的消息依次放入自己的接收缓冲区内。
对组间通信子上的收集操作,其方法调用必须包含组间通信子内的所有进程,且其中根进程必须属于某个组。根进程的 root
参数需使用 MPI.ROOT,与根进程处在同一组的其它进程的 root
参数需设置成 MPI.PROC_NULL,而另一组的所有进程的 root
参数为根进程在其组内的 rank。数据将从另一组的所有进程收集到根进程中。
方法接口
mpi4py 中的收集操作的方法(MPI.Comm 类的方法)接口为:
gather(self, sendobj, int root=0)
Gather(self, sendbuf, recvbuf, int root=0)
Gatherv(self, sendbuf, recvbuf, int root=0)
这些方法的参数与发散操作对应的方法的参数类似。Gatherv 用于从各个进程收集不同长度的消息,它的第二个参数 recvbuf
需要设置成类似于 [data, count, displ, MPI.DOUBLE]
, 其中 count
和 displ
都是一个整数系列,count
指明应该从各个进程收集来的数据个数,displ
指明从各个进程收集来的数据段应放到接收数据缓冲区中的起始偏离。
对组内通信子对象的 Gather 和 Gatherv,可以将其 sendbuf
参数设置成 MPI.IN_PLACE,此时根进程只会收集与其自身之外的所有其它进程的数据。
例程
下面给出收集操作的使用例程。
# gather.py
"""
Demonstrates the usage of gather, Gather, Gatherv.
Run this with 4 processes like:
$ mpiexec -n 4 python gather.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# ------------------------------------------------------------------------------
# gather generic object from each process to root by using gather
if rank == 0:
send_obj = 1.2
elif rank == 1:
send_obj = 'xxx'
elif rank == 2:
send_obj = {'a': 1}
else:
send_obj = (2,)
recv_obj = comm.gather(send_obj, root=1)
print 'gather: rank %d has %s' % (rank, recv_obj)
# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process to root by using Gather
send_buf = np.array([0, 1], dtype='i') + 2 * rank
if rank == 2:
recv_buf = np.empty(8, dtype='i')
else:
recv_buf = None
comm.Gather(send_buf, recv_buf, root=2)
print 'Gather: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process to root by using Gather with MPI.IN_PLACE
send_buf = np.array([0, 1], dtype='i') + 2 * rank
if rank == 2:
# initialize a receive buffer with all -1
recv_buf = np.zeros(8, dtype='i') - 1
else:
recv_buf = None
# each process other than the root sends two numbers of send_buf to root
# but the root does not sends message to itself with MPI.IN_PLACE
# rank 0 | rank 1 | rank 2 | rank 3
# ------------+------------+------------+------------
# [0, 1] | [2, 3] | [-1, -1] | [6. 7]
if rank == 2:
comm.Gather(MPI.IN_PLACE, recv_buf, root=2)
else:
comm.Gather(send_buf, recv_buf, root=2)
print 'Gather: rank %d has %s with MPI.IN_PLACE' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# gather a numpy array from each process to the root by using Gatherv
if rank == 0:
send_buf = np.array([10, 11, 12], dtype='i')
elif rank == 1:
send_buf = np.array([13], dtype='i')
elif rank == 2:
send_buf = np.array([14, 15, 16, 17], dtype='i')
else:
send_buf = np.array([18, 19], dtype='i')
if rank == 2:
recv_buf = np.empty(10, dtype='i')
else:
recv_buf = None
count = [3, 1, 4, 2]
displ = [0, 3, 4, 8]
# gather numpy arrays with different length to the root from each process with allocation:
# rank 0 | rank 1 | rank 2 | rank 3
# -----------+------------+-------------+-------------
# 10 11 12 | 13 | 14 15 16 17 | 18 19
# displ: 0 3 4 8
comm.Gatherv(send_buf, [recv_buf, count, displ, MPI.INT], root=2)
print 'Gatherv: rank %d has %s' % (rank, recv_buf)
运行结果如下:
$ mpiexec -n 4 python gather.py
gather: rank 0 has None
Gather: rank 0 has None
Gather: rank 0 has None with MPI.IN_PLACE
Gatherv: rank 0 has None
gather: rank 3 has None
Gather: rank 3 has None
Gather: rank 3 has None with MPI.IN_PLACE
Gatherv: rank 3 has None
gather: rank 1 has [1.2, 'xxx', {'a': 1}, (2,)]
Gather: rank 1 has None
Gather: rank 1 has None with MPI.IN_PLACE
Gatherv: rank 1 has None
gather: rank 2 has None
Gather: rank 2 has [0 1 2 3 4 5 6 7]
Gather: rank 2 has [ 0 1 2 3 -1 -1 6 7] with MPI.IN_PLACE
Gatherv: rank 2 has [10 11 12 13 14 15 16 17 18 19]
以上我们介绍了 mpi4py 中的收集操作方法,在下一篇中我们将介绍规约操作。