mpi4py 中的规约发散操作

上一篇中我们介绍了 mpi4py 中的全规约操作方法,下面我们将介绍规约发散操作。

对组内通信子上的规约发散操作,首先对各个进程所保有的输入向量实施规约操作,再将结果向量散发到各个进程。相当于以某个进程为根,执行一次规约操作后跟一次散发操作。

对组间通信子上的规约发散操作,对与之相关联的组 group A 和 group B,将 A 中所有进程提供的数据的规约结果散发到 B 的进程中,反之亦然。

方法接口

mpi4py 中的规约发散操作的方法(MPI.Comm 类的方法)接口为:

Reduce_scatter_block(self, sendbuf, recvbuf, Op op=SUM)
Reduce_scatter(self, sendbuf, recvbuf, recvcounts=None, Op op=SUM)

注意:没有对应的以小写字母开头的方法。Reduce_scatter 相对于 Reduce_scatter_block 多了一个 recvcounts 参数,用以指定每个进程接收到的数据个数,每个进程接收到的数据量可以不同,因此 Reduce_scatter 的散发步骤实际上执行的是 Scatterv 操作,而 Reduce_scatter_block 在散发步骤则执行的是 Scatter 操作,即散发到各个进程的数据量相同。

对组内通信子对象的 Reduce_scatter_block 和 Reduce_scatter,可以将其 sendbuf 参数设置成 MPI.IN_PLACE,此时 recvbuf 将既作为发送缓冲区又作为接收缓冲区,每个进程将从 recvbuf 中提取数据,并将规约后的结果填充到 recvbuf 中。当结果短于 recvbuf 的容量时,只会填充其起始部分。

例程

下面给出全规约操作的使用例程。

# reduce_scatter.py

"""
Demonstrates the usage of Reduce_scatter_block, Reduce_scatter.

Run this with 4 processes like:
$ mpiexec -n 4 python reduce_scatter.py
"""

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()


# ------------------------------------------------------------------------------
# reduce scatter a numpy array by using Reduce_scatter_block
send_buf = np.arange(8, dtype='i')
recv_buf = np.empty(2, dtype='i')

# first step: reduce
# rank 0  |  0  1  2  3  4  5  6  7
# rank 1  |  0  1  2  3  4  5  6  7
# rank 2  |  0  1  2  3  4  5  6  7
# rank 3  |  0  1  2  3  4  5  6  7
# --------+-------------------------
# SUM     |  0  4  8  12 16 20 24 28
# second step: scatter
#  rank 0  |  rank 1  | rank 2  |  rank 3
# ---------+----------+---------+---------
#   0 4    |   8 12   |  16 20  |  24 28
comm.Reduce_scatter_block(send_buf, recv_buf, op=MPI.SUM)
print 'Reduce_scatter_block by SUM: rank %d has %s' % (rank, recv_buf)

# ------------------------------------------------------------------------------
# reduce scatter a numpy array by using Reduce_scatter_block with MPI.IN_PLACE
recv_buf = np.arange(8, dtype='i')

# with MPI.IN_PLACE, recv_buf is used as both send buffer and receive buffer
# the first two elements of recv_buf will be filled with the scattered results
comm.Reduce_scatter_block(MPI.IN_PLACE, recv_buf, op=MPI.SUM)
print 'Reduce_scatter_block by SUM with MPI.IN_PLACE: rank %d has %s' % (rank, recv_buf)

# ------------------------------------------------------------------------------
# reduce scatter a numpy array by using Reduce_scatter
send_buf = np.arange(8, dtype='i')
recvcounts = [2, 3, 1, 2]
recv_buf = np.empty(recvcounts[rank], dtype='i')

# first step: reduce
# rank 0  |  0  1  2  3  4  5  6  7
# rank 1  |  0  1  2  3  4  5  6  7
# rank 2  |  0  1  2  3  4  5  6  7
# rank 3  |  0  1  2  3  4  5  6  7
# --------+-------------------------
# SUM     |  0  4  8  12 16 20 24 28
# second step: scatterv with [2, 3, 1, 2]
#  rank 0  |  rank 1  | rank 2  |  rank 3
# ---------+----------+---------+---------
#   0 4    |  8 12 16 |   20    |  24 28
comm.Reduce_scatter(send_buf, recv_buf, recvcounts=[2, 3, 1, 2], op=MPI.SUM)
print 'Reduce_scatter by SUM: rank %d has %s' % (rank, recv_buf)

运行结果如下:

$ mpiexec -n 4 python reduce_scatter.py
Reduce_scatter_block by SUM: rank 0 has [0 4]
Reduce_scatter_block by SUM: rank 1 has [ 8 12]
Reduce_scatter_block by SUM: rank 3 has [24 28]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 0 has [0 4 2 3 4 5 6 7]
Reduce_scatter by SUM: rank 0 has [0 4]
Reduce_scatter_block by SUM: rank 2 has [16 20]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 2 has [16 20  2  3  4  5  6  7]
Reduce_scatter by SUM: rank 2 has [20]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 1 has [ 8 12  2  3  4  5  6  7]
Reduce_scatter by SUM: rank 1 has [ 8 12 16]
Reduce_scatter_block by SUM with MPI.IN_PLACE: rank 3 has [24 28  2  3  4  5  6  7]
Reduce_scatter by SUM: rank 3 has [24 28]

以上我们介绍了 mpi4py 中的规约发散操作方法,在下一篇中我们将介绍全发散操作。

    原文作者:自可乐
    原文地址: https://www.jianshu.com/p/41164304db11
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞