在上一篇中我们介绍了 mpi4py 中的发散操作方法,下面我们将介绍收集操作。
对组间通信子上的收集操作,其方法调用必须包含组间通信子内的所有进程,且其中根进程必须属于某个组。根进程的 root
参数需使用 MPI.ROOT,与根进程处在同一组的其它进程的 root
参数需设置成 MPI.PROC_NULL,而另一组的所有进程的 root
参数为根进程在其组内的 rank。数据将从另一组的所有进程收集到根进程中。
mpi4py 中的收集操作的方法(MPI.Comm 类的方法)接口为:
gather(self, sendobj, int root=0)
Gather(self, sendbuf, recvbuf, int root=0)
Gatherv(self, sendbuf, recvbuf, int root=0)
这些方法的参数与发散操作对应的方法的参数类似。Gatherv 用于从各个进程收集不同长度的消息,它的第二个参数 recvbuf
需要设置成类似于 [data, count, displ, MPI.DOUBLE]
, 其中 count
和 displ
对组内通信子对象的 Gather 和 Gatherv,可以将其 sendbuf
参数设置成 MPI.IN_PLACE,此时根进程只会收集与其自身之外的所有其它进程的数据。
# gather.py
Demonstrates the usage of gather, Gather, Gatherv.
Run this with 4 processes like:
$ mpiexec -n 4 python gather.py
import numpy as np
from mpi4py import MPI
rank = comm.Get_rank()
# ------------------------------------------------------------------------------
# gather generic object from each process to root by using gather
if rank == 0:
send_obj = 1.2
elif rank == 1:
send_obj = 'xxx'
elif rank == 2:
send_obj = {'a': 1}
send_obj = (2,)
recv_obj = comm.gather(send_obj, root=1)
print 'gather: rank %d has %s' % (rank, recv_obj)
# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process to root by using Gather
send_buf = np.array([0, 1], dtype='i') + 2 * rank
if rank == 2:
recv_buf = np.empty(8, dtype='i')
recv_buf = None
comm.Gather(send_buf, recv_buf, root=2)
print 'Gather: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process to root by using Gather with MPI.IN_PLACE
send_buf = np.array([0, 1], dtype='i') + 2 * rank
if rank == 2:
# initialize a receive buffer with all -1
recv_buf = np.zeros(8, dtype='i') - 1
recv_buf = None
# each process other than the root sends two numbers of send_buf to root
# but the root does not sends message to itself with MPI.IN_PLACE
# rank 0 | rank 1 | rank 2 | rank 3
# ------------+------------+------------+------------
# [0, 1] | [2, 3] | [-1, -1] | [6. 7]
if rank == 2:
comm.Gather(MPI.IN_PLACE, recv_buf, root=2)
comm.Gather(send_buf, recv_buf, root=2)
print 'Gather: rank %d has %s with MPI.IN_PLACE' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# gather a numpy array from each process to the root by using Gatherv
if rank == 0:
send_buf = np.array([10, 11, 12], dtype='i')
elif rank == 1:
send_buf = np.array([13], dtype='i')
elif rank == 2:
send_buf = np.array([14, 15, 16, 17], dtype='i')
send_buf = np.array([18, 19], dtype='i')
if rank == 2:
recv_buf = np.empty(10, dtype='i')
recv_buf = None
count = [3, 1, 4, 2]
displ = [0, 3, 4, 8]
# gather numpy arrays with different length to the root from each process with allocation:
# rank 0 | rank 1 | rank 2 | rank 3
# -----------+------------+-------------+-------------
# 10 11 12 | 13 | 14 15 16 17 | 18 19
# displ: 0 3 4 8
comm.Gatherv(send_buf, [recv_buf, count, displ, MPI.INT], root=2)
print 'Gatherv: rank %d has %s' % (rank, recv_buf)
$ mpiexec -n 4 python gather.py
gather: rank 0 has None
Gather: rank 0 has None
Gather: rank 0 has None with MPI.IN_PLACE
Gatherv: rank 0 has None
gather: rank 3 has None
Gather: rank 3 has None
Gather: rank 3 has None with MPI.IN_PLACE
Gatherv: rank 3 has None
gather: rank 1 has [1.2, 'xxx', {'a': 1}, (2,)]
Gather: rank 1 has None
Gather: rank 1 has None with MPI.IN_PLACE
Gatherv: rank 1 has None
gather: rank 2 has None
Gather: rank 2 has [0 1 2 3 4 5 6 7]
Gather: rank 2 has [ 0 1 2 3 -1 -1 6 7] with MPI.IN_PLACE
Gatherv: rank 2 has [10 11 12 13 14 15 16 17 18 19]
以上我们介绍了 mpi4py 中的收集操作方法,在下一篇中我们将介绍规约操作。