在上一篇中我们介绍了 mpi4py 中的 memory 对象及内存操作,下面我们将介绍 mpi4py 中的简单并行 I/O 操作。
在前面我们已经简要地介绍了 mpi4py 中的并行 I/O 及文件视图等相关概念和操作,但是 MPI 中的 I/O 操作应该是 MPI 标准中最复杂和最难理解的部分,因为 MPI 中 I/O 操作的方法非常多,而且极易发生混淆(可参见这里的表格),很难知道在什么情况下该选用什么合适的并行 I/O 操作方法以达到高的 I/O 性能。在很多时候,并行计算程序的性能瓶颈都在 I/O 操作上,因此理解并选用合适的 I/O 操作对提高程序的整体性能是非常重要的。在下面的若干篇中,我们将逐步由浅入深地介绍 mpi4py 中的并行 I/O 操作方法,讲解这些方法的区别和使用场合,以及获得高性能 I/O 操作的途径、注意事项和建议等。
使用独立文件指针
对最简单的并行文件 I/O 操作,其实也遵循通常的(非并行)I/O 操作方式,即:打开文件,移动文件指针到指定位置,读/写文件,然后关闭文件。对这种并行 I/O 操作方式,每个进程都拥有自己的文件指针,每个进程都独立地移动自己的文件指针,并且从自己的文件指针所指的位置读写数据,因此被称作使用独立文件指针的并行 I/O。不过需要注意的是,使用独立文件指针的 I/O 操作不是线程安全的,因为独立文件指针为每个进程所拥有,如果在进程中使用多线程,则每个线程对独立文件指针的操作会相互冲突。下面是使用独立文件指针相关方法的使用接口:
MPI.File.Open(type cls, Intracomm comm, filename, int amode=MODE_RDONLY, Info info=INFO_NULL)
并行打开文件,返回打开文件的句柄。此为一个集合操作,comm
必须为一个组内通信子对象,该通信子内的所有进程以访问模式 amode
同时打开名为 filename
的文件,可通过 info
参数向 MPI 环境传递一些 hints,这些 hints 通常用来指出文件访问以及文件系统相关的一些特殊信息,要求所有进程打开的文件 filename
都在物理上指向同一个文件(即文件路径可能不同,但一定要是磁盘上同一个物理位置的文件),所有进程打开文件使用的 amode
也必须相同,但每个进程可以分别使用自己的 info
对象。如果某个进程需要独自打开一个文件访问,则可设置其参数 comm
为 MPI.COMM_SELF。访问模式 amode
有如下几种:
- MPI.MODE_RDONLY:只读;
- MPI.MODE_RDWR;读写;
- MPI.MODE_WRONLY:只写;
- MPI.MODE_CREATE:如果不存在,则创建文件;
- MPI.MODE_EXCL:如果要创建的文件已经存在则报错;
- MPI.MODE_DELETE_ON_CLOSE:关闭时删除文件;
- MPI.MODE_UNIQUE_OPEN:不允许同时打开文件,包括从 MPI 环境内和环境外两种情况;
- MPI.MODE_SEQUENCIAL:顺序方式访问文件,利于针对串行设备进行优化;
- MPI.MODE_APPEND:所有文件指针指向文件末尾。
在 MPI 中,要区别两种形式的文件——传统的随机访问文件和串行流式访问文件(如管道、磁带等)。对串行文件(只允许以 amode = MPI.MODE_SEQUENCIAL 打开),调用 MPI.File.Seek_shared 和 MPI.File.Get_position_shared 都可能导致错误,也不允许在 filetype 和 etype 中出现空洞,仅允许执行共享文件指针的读写操作,但这些操作所定义的文件指针更新策略对串行文件不再适用。
MPI.File.Seek(self, Offset offset, int whence=SEEK_SET)
根据 whence
参数指定的方式更新进程各自的文件指针到指定偏移位置 offset
,不是一个集合称作。whence
可能的取值如下:
- MPI.SEEK_SET:将文件指针设置为
offset
参数给出的值; - MPI.SEEK_CUR:将文件指针设置为相对于当前位置的
offset
偏移处,即当前位置加上offset
参数; - MPI.SEEK_END:将文件指针设置为指向文件末尾再加上
offset
参数的值。
offset
的值可为负,但要注意在文件视图中指定相对于视图起始位置负数的偏移会导致错误。
MPI.File.Read(self, buf, Status status=None)
从进程当前独立文件指针的位置读取数据到数据缓冲区 buf
中,buf
是一个形如 [data, count, datatype] 或 [data, datatype] 的三元或二元序列,其中 data 是实际的数据缓冲区,count 指明最大读取的数据计数(以 datatype 为单位),当 count 省略时会利用 data 的字节长度和 datatype 计算出 count 值。对 numpy 数组,其计数和数据类型可以自动推断出来,因此可以直接以数组本身作为参数传给 buf
。实际读取的数据量可以从传递给 statuts
参数的 MPI.Status 对象中通过方法 Get_count 和 Get_elements 获取。
该方法是一个阻塞的非集合操作。如果在打开文件时使用了 MPI.MODE_SEQUENCIAL,则使用该方法时会出错。
MPI.File.Write(self, buf, Status status=None)
将数据缓冲区 buf
中的数据写入进程当前独立文件指针的位置,buf
是一个形如 [data, count, datatype] 或 [data, datatype] 的三元或二元序列,其中 data 是实际的数据缓冲区,count 指明最大写入的数据计数(以 datatype 为单位),当 count 省略时会利用 data 的字节长度和 datatype 计算出 count 值。对 numpy 数组,其计数和数据类型可以自动推断出来,因此可以直接以数组本身作为参数传给 buf
。实际写入的数据量可以从传递给 statuts
参数的 MPI.Status 对象中通过方法 Get_count 和 Get_elements 获取。
该方法是一个阻塞的非集合操作。如果在打开文件时使用了 MPI.MODE_SEQUENCIAL,则使用该方法时会出错。
MPI.File.Close(self)
关闭当前并行文件。所有进程通过这个方法执行一个集合操作关闭打开的并行文件。该操作会首先执行 MPI.File.Sync 然后再关闭文件句柄。如果打开文件时使用的 amode
为 MPI.MODE_DELETE_ON_CLOSE,则关闭后还会自动调用 MPI.File.Delete。最后该函数把文件句柄设置成 MPI.FILE_NULL。与串行程序一样,应用程序应设法保证关闭文件时的数据安全。
以上介绍的五个并行 I/O 操作方法实际上已经足够完成任何 I/O 操作任务,并且这些方法和 Unix/Linux 系统提供的 I/O 操作和使用方法非常类似。MPI 还提供了数量众多的其它 I/O 操作方法,这些方法要么是提供更高的性能,要么具有更好的可移植性,要么更加方便易用,等等。因此要很好地利用 MPI 并行 I/O 操作的这些优势,仅知道和使用这五个基本方法还是不够的,我们将在后面逐步介绍 mpi4py 中的其它并行 I/O 操作方法。
使用显式偏移地址
MPI 也提供了另外一组并行 I/O 方法,称作显式偏移地址方法,这组方法不使用独立文件指针,而是直接将文件中的偏移地址作为参数传递给文件读/写函数。使用显式偏移地址并行 I/O 方法与使用独立文件指针的方法几乎一样,只不过不用再单独调用 MPI.File.Seek 来设置文件的偏移位置。使用显式偏移地址的并行 I/O 方法是线程安全的。下面是使用显式偏移地址相关方法的使用接口,这两个方法除了额外的 offset
参数外,其余参数都与使用独立文件指针的对应方法同,使用条件也一样,也是阻塞非集合操作。offset
参数以当前文件视图的 etype 为单位指定对文件进行操作的起始偏移位置。
MPI.File.Read_at(self, Offset offset, buf, Status status=None)
MPI.File.Write_at(self, Offset offset, buf, Status status=None)
获取文件大小及删除文件
再 I/O 操作中可能需要知道文件的大小以确定各个进程的数据分配,在 I/O 操作完后,可能需要删除文件,下面是相应的方法接口:
MPI.File.Get_size(self)
获取当前并行文件的大小,以字节为单位计算。也可以通过属性 size 获取。
MPI.File.Delete(type cls, filename, Info info=INFO_NULL)
删除一个没有被任何进程打开的文件 filename
,否则会出错。如果文件不存在,则通过 MPI.ERR_NO_SUCH_FILE 错误给出提示信息。可用 info
对象传递与特定文件系统相关的信息。注意:该方法不是一个集合操作,一般使用一个单独的进程执行该方法以删除文件。
例程
下面给出使用例程。
# simple_io.py
"""
Demonstrates the usage of individual file pointer and explicit offsets I/O methods.
Run this with 4 processes like:
$ mpiexec -n 4 python simple_io.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank < 3:
num_ints = 10 # number of int types
else:
num_ints = 8 # number of int types
buf1 = np.arange(num_ints, dtype='i')
buf2 = np.zeros(10, dtype='i') # initialize to all zeros
offset = 10 * MPI.INT.Get_size() # in unit of bytes
filename = 'temp.txt'
# use individual file pointer
# ------------------------------------------------------------------------------
# open the file for write only, create it if it does not exist
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_WRONLY)
# set individual file pointer of each process
# here we use the default file view, so offset is in bytes
fh.Seek(rank*offset, whence=MPI.SEEK_SET)
# each process writes buf1 to file
fh.Write(buf1)
# close the file
fh.Close()
# open the existed file for read only
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_RDONLY)
print 'size of file: %d bytes' % fh.Get_size()
# set individual file pointer of each process, prepare for reading
fh.Seek(rank*offset, whence=MPI.SEEK_SET)
# each process reads data to buf2 from file
status = MPI.Status()
fh.Read(buf2, status)
# get the amount of data actually read
print 'rank %d read %d MPI.INTs' % (rank, status.Get_count(datatype=MPI.INT))
print 'process %d has buf2 = %s' % (rank, buf2)
# check position of individual file pointer
print 'process %d has file pointer position %d after read' % (rank, fh.Get_position())
# close the file
fh.Close()
# delete the file
if rank == 0:
MPI.File.Delete(filename)
# use explicit offsets
# ------------------------------------------------------------------------------
# open the file for write only, create it if it does not exist
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_CREATE | MPI.MODE_WRONLY)
# each process writes buf1 to file, start from the position of rank*offset
fh.Write_at(rank*offset, buf1)
# close the file
fh.Close()
# open the existed file for read only, and delete the file on close
fh = MPI.File.Open(comm, filename, amode= MPI.MODE_RDONLY | MPI.MODE_DELETE_ON_CLOSE)
# each process reads data to buf2 from file, start from the position of rank*offset
status = MPI.Status()
fh.Read_at(rank*offset, buf2, status)
# close the file
fh.Close()
运行结果如下:
$ mpiexec -n 4 python simple_io.py
size of file: 152 bytes
rank 0 read 10 MPI.INTs
size of file: 152 bytes
rank 1 read 10 MPI.INTs
size of file: 152 bytes
rank 2 read 10 MPI.INTs
size of file: 152 bytes
rank 3 read 8 MPI.INTs
process 3 has buf2 = [0 1 2 3 4 5 6 7 0 0]
process 3 has file pointer position 152 after read
process 0 has buf2 = [0 1 2 3 4 5 6 7 8 9]
process 0 has file pointer position 40 after read
process 1 has buf2 = [0 1 2 3 4 5 6 7 8 9]
process 1 has file pointer position 80 after read
process 2 has buf2 = [0 1 2 3 4 5 6 7 8 9]
process 2 has file pointer position 120 after read
以上介绍了 mpi4py 中的简单并行 I/O 操作,在下一篇中我们将介绍 mpi4py 中的不连续读/写和集合 I/O 操作。