Source code for hdnnpy.dataset.hdnnp_dataset

# coding: utf-8

"""Combine and preprocess descriptor and property dataset."""

import numpy as np

from hdnnpy.utils import (MPI, recv_chunk, send_chunk)


RANDOMSTATE = np.random.get_state()


[docs]class HDNNPDataset(object): """Combine and preprocess descriptor and property dataset.""" def __init__(self, descriptor, property_, dataset=None): """ | It is desirable that the type of descriptor and property used for HDNNP is fixed at initialization. | Also, an instance itself does not have any dataset at initialization and you need to execute :meth:`construct`. | If ``dataset`` is given it will be an instance's own dataset. Args: descriptor (DescriptorDatasetBase): Descriptor instance you want to use as HDNNP input. property\_ (PropertyDatasetBase): Property instance you want to use as HDNNP label. dataset (dict [~numpy.ndarray], optional): If specified, dataset will be initialized with this. """ if dataset is None: dataset = {} self._descriptor = descriptor self._property = property_ self._dataset = dataset.copy()
[docs] def __getitem__(self, item): """Return indexed or sliced dataset as dict data.""" batches = {key: data[item] for key, data in self._dataset.items()} if isinstance(item, slice): length = len(list(batches.values())[0]) return [{key: batch[i] for key, batch in batches.items()} for i in range(length)] else: return batches
[docs] def __len__(self): """Redicect to :attr:`partial_size`""" return self.partial_size
@property def descriptor(self): """DescriptorDatasetBase: Descriptor dataset instance.""" return self._descriptor @property def elemental_composition(self): """list [str]: Elemental composition of the dataset.""" return self._descriptor.elemental_composition @property def elements(self): """list [str]: Elements of the dataset.""" return self._descriptor.elements @property def n_input(self): """int: Number of dimensions of input data.""" if 'inputs/0' in self._dataset: return self._dataset['inputs/0'].shape[-1] else: return self._descriptor.n_feature @property def n_label(self): """int: Number of dimensions of label data.""" if 'labels/0' in self._dataset: return self._dataset['labels/0'].shape[-1] else: return self._property.n_property @property def partial_size(self): """int: Number of data after scattered by MPI communication.""" return len(list(self._dataset.values())[0]) @property def tag(self): """str: Unique tag of the dataset. Usually, it is a form like ``<any prefix> <chemical formula>``. (ex. ``CrystalGa2N2``) """ return self._descriptor.tag @property def total_size(self): """int: Number of data before scattered by MPI communication.""" return len(self._descriptor) @property def property(self): """PropertyDatasetBase: Property dataset instance.""" return self._property
[docs] def construct(self, all_elements=None, preprocesses=None, shuffle=True, verbose=True): """Construct an instance's own dataset. This method does following steps: * Check compatibility between descriptor and property datasets. * Expand feature dimension of descriptor dataset according to ``all_elements`` and pre-process descriptor dataset in a given order and add to its own dataset. * Add property dataset to its own dataset. * Clear up the original data in descriptor and property dataset. * Shuffle the order of the data. Args: all_elements (list [str], optional): If specified, it expands feature dimensions of descriptor dataset according to this. preprocesses (list [PreprocessBase], optional): If specified, it pre-processes descriptor dataset in a given order. shuffle (bool, optional): If specified, it shuffles the order of the data. verbose (bool, optional): Print log to stdout. Raises: AssertionError: If descriptor and property datasets are incompatible. """ if preprocesses is None: preprocesses = [] # check compatibility between descriptor and property datasets assert len(self._descriptor) == len(self._property) assert self._descriptor.elemental_composition \ == self._property.elemental_composition assert self._descriptor.elements == self._property.elements assert self._descriptor.tag == self._property.tag # add descriptor dataset and delete original data if self._descriptor.has_data: inputs = [self._descriptor[key] for key in self._descriptor.descriptors] # expand along to feature dimension if all_elements != self._descriptor.elements: old_feature_keys = self._descriptor.feature_keys new_feature_keys = ( self._descriptor.generate_feature_keys(all_elements)) inputs = self._expand_feature_dims( inputs, old_feature_keys, new_feature_keys) # pre-process descriptor dataset for preprocess in preprocesses: inputs = preprocess.apply( inputs, self.elemental_composition, verbose=verbose) self._dataset.update( {f'inputs/{i}': data for i, data in enumerate(inputs)}) self._descriptor.clear() # add property dataset and delete original data if self._property.has_data: labels = [self._property[key] for key in self._property.properties] self._dataset.update( {f'labels/{i}': data for i, data in enumerate(labels)}) self._property.clear() # shuffle dataset if shuffle: self._shuffle()
[docs] def scatter(self, max_buf_len=256 * 1024 * 1024): """Scatter dataset by MPI communication. Each instance is re-initialized with received dataset. Args: max_buf_len (int, optional): Each data is divided into chunks of this size at maximum. """ if MPI.rank == 0: new_dataset = {} MPI.comm.bcast(len(self._dataset), root=0) while self._dataset: key, data = self._dataset.popitem() n_total = self.total_size n_sub = -(-n_total // MPI.size) for i in range(MPI.size): s = n_total*i//MPI.size e = n_total*i//MPI.size + n_sub if i == 0: new_dataset[key] = data[s:e] else: MPI.comm.send(key, dest=i) send_chunk(data[s:e], dest=i, max_buf_len=max_buf_len) self._dataset.update(new_dataset) else: self._dataset.clear() n_data = MPI.comm.bcast(None, root=0) for i in range(n_data): key = MPI.comm.recv(source=0) recv_data = recv_chunk(source=0, max_buf_len=max_buf_len) self._dataset[key] = recv_data
[docs] def take(self, index): """Return copied object that has sliced dataset. Args: index (int or slice): Copied object has dataset indexed or sliced by this. """ dataset = {key: data[index] for key, data in self._dataset.items()} new_dataset = self.__class__(self._descriptor, self._property, dataset) return new_dataset
@staticmethod def _expand_feature_dims(inputs, old_feature_keys, new_feature_keys): """Expand feature dimension of input dataset according to ``all_elements``.""" n_pad = len(new_feature_keys) - len(old_feature_keys) idx_pad = len(old_feature_keys) sort_indices = [] for key in new_feature_keys: if key in old_feature_keys: sort_indices.append(old_feature_keys.index(key)) else: sort_indices.append(idx_pad) idx_pad += 1 sort_indices = np.array(sort_indices) for i, data in enumerate(inputs): pad_width = [(0, n_pad) if i == 2 else (0, 0) for i in range(data.ndim)] data = np.pad(data, pad_width, 'constant') inputs[i] = data[:, :, sort_indices] return inputs def _shuffle(self): """Shuffle the order of the data.""" for data in self._dataset.values(): np.random.set_state(RANDOMSTATE) np.random.shuffle(data)