mpi4py 进阶之 MPIArray

上一篇中我们介绍了 caput 软件包的 mpiutil 模块中提供的若干方便和易用的函数,下面我们将介绍 caput 中另一个模块 mpiarray 提供的建立在 numpy array 基础上的并行分布式数组 MPIArray。

MPIArray 类继承自 numpy.ndarray,因此 numpy 数组的众多方法都适用于 MPIArray,不过 MPIArray 却能够以一种整体的方式管理和操作分布在不同进程间的 numpy 数组,并提供若干特有的但方便的方法和属性以供使用,下面就将介绍它们。

注意:因为 MPIArray 类所在的 mpiarray 模块在内部导入使用我们前面介绍过的 mpiutil,因此 MPIArray 也能兼容非 MPI 环境,在此情况下只有单个进程执行,MPIArray 的 comm 属性将为 None,MPIArray 实际上就是一个单一的完整的 numpy.ndarray,附加上一些额外的属性和方法。

方法接口

__new__(cls, global_shape, axis=0, comm=None, *args, **kwargs)

MPIArray 构造方法。创建一个 shape 为 global_shape, 分布在通信子对象 comm 上的,分布轴为 axis 的 MPIArray,其它可选参数同 numpy.ndarray。comm 的默认值为 None,在 mpi4py 可用的情况下会使用 MPI.COMM_WORLD,不可用时就为 None (此时创建的 MPIArray 就存在于单个进程上)。当 comm 是一个有效的通信子对象时,所创建的 MPIArray 会按照 axis 轴分布在 comm 所包含的所有进程上。MPIArray 只能分布在一个单独的轴上,并且在该轴上的分布必须满足一定的限制,即每个进程所持有的子数组在该轴上的 shape 都相等(如果能够均分的话)或者 rank 较小的一些进程会多 1 (如果不能均分)。这一限制必须在 MPIArray 存在的整个生命时期内都得到满足。MPIArray 在除了分布轴之外的所有其它轴上的 shape 在各个进程中都相同。

from_numpy_array(cls, array, axis=0, root=None, comm=None)

由一个 numpy 数组 array 构造一个分布在通信子 comm 上,分布轴为 axis 的 MPIArray 并返回,所创建的 MPIArray 的 global_shape 同 array 的shape。参数 root 可为一个整数或 None,当其为一个整数时,数据将由 rank 等于该整数的进程的 array 散发给所有其它进程,因此其它进程的 array 参数可以为 None;当 root 为 None 时,每个进程从各自的 array 中获取对应的数据,因此一般来说每个进程的 array 应该都相同,但也可以不同。

to_numpy_array(self, root=0)

将当前的 MPIArray 转化为 numpy array。如果 root 为一个整数,则只有 rank 为该整数的进程会返回转化后的 numpy array,其它进程返回 None;如果 root 为 None,则所有进程都返回转化后的 numpy array。

wrap(cls, array, axis, comm=None)

将通信子 comm 上的各个进程所持有的 numpy 数组 array 沿分布轴 axis 包装成一个 MPIArray 并返回。各个进程的 arrayaxis 轴上的 shape 必须满足 MPIArray 的限制要求,即都相等或者 rank 较小的进程多 1,在其它轴上的 shape 必须都相同。注意该方法与 from_numpy_array 的区别。

redistribute(self, axis)

将当前的 MPIArray 重新分布到轴 axis 上,返回一个不与原 MPIArray 共享数据的新的 MPIArray。注意:这是一个通信量比较大的操作,数据会在所有进程间重新分布。

enumerate(self, axis)

一个方便的方法来返回当前的 MPIArray 在轴 axis 上的 local_index 和 global_index 迭代器。global_index 是指在将整个 MPIArray 当作一个大的 numpy array 时的 index,而 local_index 则在每个进程所持有的子数组中的 index。当 axis 不是分布轴时,每个进程返回的 local_index 和 global_index 都是一样的,但当 axis 为分布轴时,除了 rank = 0 的的进程返回的 local_index 和 global_index 一样之外,其它进程的都不一样,因为 local_index 会从 0 开始计数。

from_hdf5(cls, f, dataset, axis=0, comm=None)

由一个 HDF5 文件 f 中的数据 dataset 构造一个分布在通信子 comm 上,分布轴为 axis 的 MPIArray 并返回。f 可以为一个 HDF5 文件的文件名字符串或者一个打开的 HDF5 文件句柄。HDF5 文件操作以及并行分布式的 HDF5 在后面会有相应的介绍。注意:该方法同 from_numpy_array 非常类似,只不过数据的来源不同。

to_hdf5(self, filename, dataset, create=False)

将当前的 MPIArray 中的数据存储到 HDF5 文件 filename 中的数据集 dataset 中,当 create 为 True 时会创建一个新文件,为 False时会写入到已经存在的文件中,此时该存在的文件中如果已经有数据集 dataset 则会出错。HDF5 文件操作以及并行分布式的 HDF5 在后面会有相应的介绍。

transpose(self, axes)

将当前的 MPIArray 按照轴 axes 进行转置,即调整轴的次序。返回转置后的新 MPIArray,但是共享原 MPIArray 的数据。

reshape(self, *shape)

改变当前 MPIArray 的 global_shape。返回 reshpae 后的新 MPIArray,但是共享原 MPIArray 的数据。注意不能改变分布轴且分布轴对应的元素必须设置成 None,除此之外同 numpy array 的 reshape。

copy(self)

返回一个当前 MPIArray 的复制对象,其所包含的数据也会进行复制,即不再共享原 MPIArray 的数据。

属性

global_shape

MPIArray 的整体 shape,每个进程都会返回相同的结果。

local_shape

每个进程所持有的子数组的 shape。

local_offset

每个进程所持有的子数组在整个 MPIArray 中各轴的偏移位置,对非分布轴偏移都为 0。

axis

MPIArray 的分布轴。

local_array

每个进程所持有的子数组,是一个 numpy array。

comm

MPIArray 的通信子对象,如果 mpi4py 不可用则为 None。

例程

下面给出以上介绍的方法和属性的使用例程。

# mpiarray_demo.py

"""
Demonstrates the usage of MPIArray, from_numpy_array, to_numpy_array,
to_hdf5, enumerate, redistribute, reshape, transpose, wrap.

Run this with 4 processes like:
$ mpiexec -n 4 python mpiarray_demo.py
"""

import os
import numpy as np
from caput import mpiutil
from caput.mpiarray import MPIArray


rank = mpiutil.rank
size = mpiutil.size

# construct a MPIArray with global_shape (5, 4, 3) and distribute axis 0
shape = (5, 4, 3)
dist_axis = 0
darr = MPIArray(global_shape=shape, axis=dist_axis, dtype=np.float32)
print 'rank %d has global_shape = %s, local_shape = %s, local_offset = %s' % (rank, darr.global_shape, darr.local_shape, darr.local_offset)

# from_numpy_array
nparr = np.arange(6*5*4).reshape(6, 5, 4)
darr1 = MPIArray.from_numpy_array(nparr, axis=0, root=None)
# to_numpy_array
nparr1 = darr1.to_numpy_array(root=0)
if rank == 0:
    print 'rank 0: nparr1 == nparr: %s' % np.allclose(nparr, nparr1)
else:
    print 'rank %d: nparr1 = %s' % (rank, nparr1)
# to_hdf5
h5_file = 'test.hdf5'
darr1.to_hdf5(h5_file, 'test', create=True)
# remove the file
if rank == 0:
    os.remove(h5_file)

# enumerate
for (li, gi) in darr1.enumerate(axis=0):
    print 'rank %d has (local_index, global_index) = (%d, %d) for axis 0' % (rank, li, gi)
# redistribute to axis 1
darr2 = darr1.redistribute(axis=1)
print 'rank %d has global_shape = %s, local_shape = %s after redistribute to axis 1' % (rank, darr2.global_shape, darr2.local_shape)
# reshape darr1 to have global_shape = (6, 20)
darr3 = darr1.reshape(None, 20)
# transpose darr1 to have global_shape = (5, 6, 4)
darr4 = darr2.transpose((1, 0, 2))

# wrap
if rank == 0:
    a = np.zeros((2, 3))
elif rank == 1:
    a = np.zeros((2, 3))
elif rank == 2:
    a = np.zeros((2, 2))
elif rank == 3:
    a = np.zeros((2, 2))
da = MPIArray.wrap(a, axis=1)
print 'rank %d has global_shape of da = %s' % (rank, da.global_shape)

运行结果如下:

$ mpiexec -n 4 python mpiarray_demo.py
Starting MPI rank=0 [size=4]
Starting MPI rank=1 [size=4]
Starting MPI rank=2 [size=4]
Starting MPI rank=3 [size=4]
rank 3 has global_shape = (5, 4, 3), local_shape = (1, 4, 3), local_offset = (4, 0, 0)
rank 1 has global_shape = (5, 4, 3), local_shape = (1, 4, 3), local_offset = (2, 0, 0)
rank 2 has global_shape = (5, 4, 3), local_shape = (1, 4, 3), local_offset = (3, 0, 0)
rank 0 has global_shape = (5, 4, 3), local_shape = (2, 4, 3), local_offset = (0, 0, 0)
rank 2: nparr1 = None
rank 3: nparr1 = None
rank 1: nparr1 = None
rank 0: nparr1 == nparr: True
rank 1 has (local_index, global_index) = (0, 2) for axis 0
rank 1 has (local_index, global_index) = (1, 3) for axis 0
rank 2 has (local_index, global_index) = (0, 4) for axis 0
rank 3 has (local_index, global_index) = (0, 5) for axis 0
rank 0 has (local_index, global_index) = (0, 0) for axis 0
rank 0 has (local_index, global_index) = (1, 1) for axis 0
rank 0 has global_shape = (6, 5, 4), local_shape = (6, 2, 4) after redistribute to axis 1
rank 1 has global_shape = (6, 5, 4), local_shape = (6, 1, 4) after redistribute to axis 1
rank 2 has global_shape = (6, 5, 4), local_shape = (6, 1, 4) after redistribute to axis 1
rank 3 has global_shape = (6, 5, 4), local_shape = (6, 1, 4) after redistribute to axis 1
rank 0 has global_shape of da = (2, 10)
rank 1 has global_shape of da = (2, 10)
rank 2 has global_shape of da = (2, 10)
rank 3 has global_shape of da = (2, 10)

以上我们介绍了 caput 中另一个模块 miarray 提供的建立在 numpy array 基础上的并行分布式数组 MPIArray,其中也提到了 HDF5 文件及其操作,我们将在后面介绍并行分布式的 HDF5 相关操作,在此之前我们先介绍 HDF5 文件的基本内容以及 Python 中操作 HDF5 文件的方法,以为后面的介绍作铺垫,在下一篇中我们将介绍 HDF5 文件以及操作 HDF5 文件的 Python 工具 h5py。

    原文作者:自可乐
    原文地址: https://www.jianshu.com/p/d89b32ecbc32
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞