mpi4py 中的全收集操作

上一篇中我们介绍了 mpi4py 中的规约操作方法,下面我们将介绍全收集操作。

对组内通信子上的全收集操作,将组内所有进程的发送缓冲区的数据连接在一起,并发送到所有进程的接收缓冲区内。

对组间通信子上的全收集操作,假定关联的组为 group A 和 group B,则 A 中的每一个进程贡献一个数据项,连接在一起保存到 B 的各个进程中,同时 B 的各个进程也将自己的数据项连接在一起发送给 A 的各个进程。因此 A 的发送缓冲区参数必须与 B 的接收缓冲区参数匹配,反之亦然。

组间通信子上的全收集操作可以是非对称的, group A 中的各进程所发送的数据量可能与 group B 中各进程发送的数据量不同,甚至在某个方向上某些进程的发送数据量可以为 0。

方法接口

mpi4py 中的全收集操作的方法(MPI.Comm 类的方法)接口为:

allgather(self, sendobj)

Allgather(self, sendbuf, recvbuf)
Allgatherv(self, sendbuf, recvbuf)

这些方法的参数与收集操作对应方法的参数类似,不同的是对全收集操作没有了 root 参数。

对组内通信子对象的 Allgather 和 Allgatherv,可以将其 sendbuf 参数设置成 MPI.IN_PLACE,此时 recvbuf 将被既用做发送缓冲区,又用作接收缓冲区,并且默认各进程需要从自己处接收到 recvbuf 的数据已经分别在正确的位置上。

例程

下面给出全收集操作的使用例程。

# allgather.py

"""
Demonstrates the usage of allgather, Allgather, Allgatherv.

Run this with 4 processes like:
$ mpiexec -n 4 python allgather.py
"""

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# ------------------------------------------------------------------------------
# gather generic object from each process by using allgather
if rank == 0:
    send_obj = 1.2
elif rank == 1:
    send_obj = 'xxx'
elif rank == 2:
    send_obj = {'a': 1}
else:
    send_obj = (2,)

recv_obj = comm.allgather(send_obj)
print 'allgather: rank %d has %s' % (rank, recv_obj)


# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process by using Allgather
send_buf = np.array([0, 1], dtype='i') + 2 * rank
recv_buf = np.empty(8, dtype='i')

comm.Allgather(send_buf, recv_buf)
print 'Allgather: rank %d has %s' % (rank, recv_buf)


# ------------------------------------------------------------------------------
# gather same length numpy arrays from each process by using Allgather with MPI.IN_PLACE
# initialize a receive buffer for each process
recv_buf = np.zeros(8, dtype='i') - 1
if rank == 0:
    recv_buf[:2] = np.array([0, 1]) # [0, 1, -1, -1, -1, -1, -1, -1]
elif rank == 1:
    recv_buf[2:4] = np.array([2, 3]) # [-1, -1, 2, 3, -1, -1, -1, -1]
elif rank == 2:
    recv_buf[4:6] = np.array([4, 5]) # [-1, -1, -1, -1, 4, 5, -1, -1]
elif rank == 3:
    recv_buf[6:] = np.array([6, 7]) # [ -1, -1, -1, -1, -1, -1, 6, 7]

# with MPI.IN_PLACE, recv_buf is used as both a send and a receive buffer
comm.Allgather(MPI.IN_PLACE, recv_buf)
print 'Allgather: rank %d has %s with MPI.IN_PLACE' % (rank, recv_buf)


# ------------------------------------------------------------------------------
# gather numpy array with different length from each process by using Gatherv
if rank == 0:
    send_buf = np.array([10, 11, 12], dtype='i')
elif rank == 1:
    send_buf = np.array([13], dtype='i')
elif rank == 2:
    send_buf = np.array([14, 15, 16, 17], dtype='i')
else:
    send_buf = np.array([18, 19], dtype='i')
recv_buf = np.empty(10, dtype='i')
count = [3, 1, 4, 2]
displ = [0, 3, 4, 8]
# gather numpy arrays with different length to the root from each process with allocation:
#          rank 0   |   rank 1   |   rank 2    |   rank 3
#        -----------+------------+-------------+-------------
#         10 11 12  |     13     | 14 15 16 17 |   18 19
# displ:  0               3        4               8

comm.Allgatherv(send_buf, [recv_buf, count, displ, MPI.INT])
print 'Allgatherv: rank %d has %s' % (rank, recv_buf)

运行结果如下:

$ mpiexec -n 4 python allgather.py
allgather: rank 1 has [1.2, 'xxx', {'a': 1}, (2,)]
allgather: rank 2 has [1.2, 'xxx', {'a': 1}, (2,)]
allgather: rank 3 has [1.2, 'xxx', {'a': 1}, (2,)]
allgather: rank 0 has [1.2, 'xxx', {'a': 1}, (2,)]
Allgather: rank 3 has [0 1 2 3 4 5 6 7]
Allgather: rank 3 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 3 has [10 11 12 13 14 15 16 17 18 19]
Allgather: rank 1 has [0 1 2 3 4 5 6 7]
Allgather: rank 1 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 1 has [10 11 12 13 14 15 16 17 18 19]
Allgather: rank 2 has [0 1 2 3 4 5 6 7]
Allgather: rank 2 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 2 has [10 11 12 13 14 15 16 17 18 19]
Allgather: rank 0 has [0 1 2 3 4 5 6 7]
Allgather: rank 0 has [0 1 2 3 4 5 6 7] with MPI.IN_PLACE
Allgatherv: rank 0 has [10 11 12 13 14 15 16 17 18 19]

以上我们介绍了 mpi4py 中的全收集操作方法,在下一篇中我们将介绍全规约操作。

    原文作者:自可乐
    原文地址: https://www.jianshu.com/p/135640fb53f8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞