mpi4py 中的发散操作

上一篇中我们介绍了 mpi4py 中的广播操作方法,下面将介绍发散操作。

对组内通信子对象,发散操作从组内的根进程分别向组内进程散发不同的消息。

对组间通信子对象,发散操作的函数调用应该包含组间通信子内的所有进程,且其中必须有一个组定义了根进程,数据将从根进程散发给根所处组之外组的所有进程。根进程的 root 参数使用 MPI.ROOT,根进程所处组内的其它进程 root 参数为 MPI.PROC_NULL,所有处于根进程之外组的进程使用相同的 root 参数——根进程在其组内的 rank。

方法接口

mpi4py 中的发散操作的方法(MPI.Comm 类的方法)接口为:

scatter(self, sendobj, int root=0)

Scatter(self, sendbuf, recvbuf, int root=0)
Scatterv(self, sendbuf, recvbuf, int root=0)

以小写字母开头的 scatter 可以散发一系列任意可被 pickle 系列化的 Python 对象 sendobj,这一系列对象的个数必须等于要接收消息的进程的数目。以大写字母开头的 Scatter 可以发散一个具有单段缓冲区接口的类数组对象,如 numpy 数组,参数 sendbuf/recvbuf 可以是一个长度为2或3的 list 或 tuple,类似于 [data, MPI.DOUBLE],或者 [data, count, MPI.DOUBLE],以指明发送/接收数据缓冲区,数据计数以及数据类型。当 count 省略时会利用 data 的字节长度和数据类型计算出对应的 count。对 numpy 数组,其计数和数据类型可以自动推断出来,因此可以直接以 data 作为参数传给 sendbuf/recvbuf。Scatterv 用于向各个进程散发不同长度的消息,它的第二个参数 recvbuf 可以与 Scatter 的对应参数一样设置,但是其第一个参数需要设置成类似于 [data, count, displ, MPI.DOUBLE], 其中 countdispl 都是一个整数系列,count 指明发送给各个进程的数据个数,displ 指明应该发送给各个进程的数据段在发送数据缓冲区中的起始偏离。

对组内通信子对象的 Scatter 和 Scatterv,可以将其 recvbuf 参数设置成 MPI.IN_PLACE,此时根进程将不再向其自身传递数据,即被散发的发送数据仍将包含 n 个分段(其中 n 是组的 size),对其中编号对应于根进程的分段,在设置了 MPI.IN_PLACE 的情况下不再传递给根进程。

例程

下面给出发散操作的使用例程。

# scatter.py

"""
Demonstrates the usage of scatter, Scatter, Scatterv.

Run this with 4 processes like:
$ mpiexec -n 4 python scatter.py
"""

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# ------------------------------------------------------------------------------
# scatter a list of generic object by using scatter
if rank == 1:
    send_obj = [1.2, 'xxx', {'a': 1}, (2,)]
else:
    send_obj = None

# each process receives one of the element of send_obj from rank 1
#     rank 0   |   rank 1   |   rank 2   |   rank 3
#  ------------+------------+------------+------------
#      1.2     |   'xxx'    |  {'a': 1}  |   (2,)
recv_obj = comm.scatter(send_obj, root=1)
print 'scatter: rank %d has %s' % (rank, recv_obj)


# ------------------------------------------------------------------------------
# scatter a numpy array by using Scatter
if rank == 2:
    send_buf = np.arange(8, dtype='i')
else:
    send_buf = None
recv_buf = np.empty(2, dtype='i')

# each process receives two numbers of send_buf from rank 2
#     rank 0   |   rank 1   |   rank 2   |   rank 3
#  ------------+------------+------------+------------
#     [0, 1]   |   [2, 3]   |   [4, 5]   |   [6. 7]
comm.Scatter(send_buf, recv_buf, root=2)
print 'Scatter: rank %d has %s' % (rank, recv_buf)


# ------------------------------------------------------------------------------
# scatter a numpy array by using Scatter with MPI.IN_PLACE
if rank == 2:
    send_buf = np.arange(8, dtype='i')
else:
    send_buf = None
# initialize a receive buffer [-1, -1]
recv_buf = np.zeros(2, dtype='i') - 1

# each process other than the root receives two numbers of send_buf from rank 2
# but the root does not receive message from itself with MPI.IN_PLACE
#     rank 0   |   rank 1   |   rank 2   |   rank 3
#  ------------+------------+------------+------------
#     [0, 1]   |   [2, 3]   |  [-1, -1]  |   [6. 7]
if rank == 2:
    comm.Scatter(send_buf, MPI.IN_PLACE, root=2)
else:
    comm.Scatter(send_buf, recv_buf, root=2)
print 'Scatter: rank %d has %s with MPI.IN_PLACE' % (rank, recv_buf)


# ------------------------------------------------------------------------------
# scatter a numpy array by using Scatterv
if rank == 2:
    send_buf = np.arange(10, 20, dtype='i')
else:
    send_buf = None
recv_buf = np.empty(rank+1, dtype='i')
count = [1, 2, 3, 4]
displ = [0, 1, 3, 6]
# scatter 10 numbers from rank 2 to 4 processes with allocation:
#       rank 0   |   rank 1   |   rank 2   |   rank 3
#     -----------+------------+------------+-------------
#         10     |   11 12    |  13 14 15  | 16 17 18 19
# displ:  0          1           3           6

comm.Scatterv([send_buf, count, displ, MPI.INT], recv_buf, root=2)
print 'Scatterv: rank %d has %s' % (rank, recv_buf)

运行结果如下:

$ mpiexec -n 4 python scatter.py
scatter: rank 2 has {'a': 1}
Scatter: rank 2 has [4 5]
Scatter: rank 2 has [-1 -1] with MPI.IN_PLACE
Scatterv: rank 2 has [13 14 15]
scatter: rank 0 has 1.2
Scatter: rank 0 has [0 1]
Scatter: rank 0 has [0 1] with MPI.IN_PLACE
Scatterv: rank 0 has [10]
scatter: rank 1 has xxx
Scatter: rank 1 has [2 3]
Scatter: rank 1 has [2 3] with MPI.IN_PLACE
Scatterv: rank 1 has [11 12]
scatter: rank 3 has (2,)
Scatter: rank 3 has [6 7]
Scatter: rank 3 has [6 7] with MPI.IN_PLACE
Scatterv: rank 3 has [16 17 18 19]

以上我们介绍了 mpi4py 中的发散操作方法,在下一篇中我们将介绍收集操作。

    原文作者:自可乐
    原文地址: https://www.jianshu.com/p/445281e025e8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞