# coding: utf-8
"""Utility functions used in various subpackages."""
__all__ = [
'MPI',
'pprint',
'pyyaml_path_constructor',
'pyyaml_path_representer',
'recv_chunk',
'send_chunk',
]
from pathlib import Path
import pickle
from pprint import pprint as pretty_print
import sys
import textwrap
from mpi4py import MPI as MPI4PY
INT_MAX = 2147483647
[docs]class MPI:
"""MPI world communicator and aliases."""
comm = MPI4PY.COMM_WORLD
rank = MPI4PY.COMM_WORLD.Get_rank()
size = MPI4PY.COMM_WORLD.Get_size()
[docs]def pprint(data=None, flush=True, **options):
"""Pretty print function.
Args:
data (str, optional): Data to output into stdout.
flush (bool, optional): Flush the stream after output if True.
**options: Other options passed to :meth:`print`.
"""
if data is None:
data = ''
data = textwrap.dedent(data)
if isinstance(data, list) or isinstance(data, dict):
pretty_print(data, **options)
else:
if 'stream' in options:
options['file'] = options.pop('stream')
print(data, **options)
if flush:
sys.stdout.flush()
def pyyaml_path_constructor(loader, node):
"""Helper method to load Path tag in PyYAML."""
value = loader.construct_scalar(node)
return Path(value)
def pyyaml_path_representer(dumper, instance):
"""Helper method to dump :class:`~pathlib.Path` in PyYAML."""
return dumper.represent_scalar('Path', f'{instance}')
def recv_chunk(source, max_buf_len=256 * 1024 * 1024):
"""Receive data divided into small chunks with MPI communication.
Args:
source (int): MPI source process that sends data.
max_buf_len (int, optional): Maximum size of each chunk.
Returns:
object: Received data.
"""
assert max_buf_len < INT_MAX
assert max_buf_len > 0
data = MPI.comm.recv(source=source, tag=1)
assert data is not None
total_chunk_num, max_buf_len, total_bytes = data
pickled_bytes = bytearray()
for i in range(total_chunk_num):
b = i * max_buf_len
e = min(b + max_buf_len, total_bytes)
buf = bytearray(e - b)
MPI.comm.Recv(buf, source=source, tag=2)
pickled_bytes[b:e] = buf
obj = pickle.loads(pickled_bytes)
return obj
def send_chunk(obj, dest, max_buf_len=256 * 1024 * 1024):
"""Send data divided into small chunks with MPI communication.
Args:
obj (object): Any data to send, which can be pickled.
dest (int): MPI destination process that receives data.
max_buf_len (int, optional): Maximum size of each chunk.
"""
assert max_buf_len < INT_MAX
assert max_buf_len > 0
pickled_bytes = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
total_bytes = len(pickled_bytes)
total_chunk_num = -(-total_bytes // max_buf_len)
MPI.comm.send(
(total_chunk_num, max_buf_len, total_bytes), dest=dest, tag=1)
for i in range(total_chunk_num):
b = i * max_buf_len
e = min(b + max_buf_len, total_bytes)
buf = pickled_bytes[b:e]
MPI.comm.Send(buf, dest=dest, tag=2)