Source code for hdnnpy.preprocess.pca

# coding: utf-8

"""Principal component analysis (PCA)."""

import numpy as np
from sklearn import decomposition

from hdnnpy.preprocess.preprocess_base import PreprocessBase
from hdnnpy.utils import (MPI, pprint)


[docs]class PCA(PreprocessBase): """Principal component analysis (PCA). The core part of this class uses `sklearn.decomposition.PCA` implementation. """ name = 'pca' """str: Name of this class.""" def __init__(self, n_components=None): """ Args: n_components (int, optional): Number of features to keep in decomposition. If ``None``, decomposition is not performed. """ super().__init__() self._n_components = n_components self._mean = {} self._transform = {} @property def n_components(self): """int or None: Number of features to keep in decomposition.""" return self._n_components @property def mean(self): """dict [~numpy.ndarray]: Initialized mean values in each feature dimension and each element.""" return self._mean @property def transform(self): """dict [~numpy.ndarray]: Initialized transformation matrix in each feature dimension and each element.""" return self._transform
[docs] def apply(self, dataset, elemental_composition, verbose=True): """Apply the same pre-processing for each element to dataset. It accepts 1 or 2 for length of ``dataset``, each element of which is regarded as ``0th-order``, ``1st-order``, ... Args: dataset (list [~numpy.ndarray]): Input dataset to be scaled. elemental_composition (list [str]): Element symbols corresponding to 1st dimension of ``dataset``. verbose (bool, optional): Print log to stdout. Returns: list [~numpy.ndarray]: Processed dataset to be zero-mean and unit-variance. """ order = len(dataset) - 1 assert 0 <= order <= 2 self._initialize_params(dataset[0], elemental_composition, verbose) mean = np.array( [self._mean[element] for element in elemental_composition]) transform = np.array( [self._transform[element] for element in elemental_composition]) if order >= 0: dataset[0] = np.einsum('saf,aft->sat', dataset[0]-mean, transform) if order >= 1: dataset[1] = np.einsum('safx,aft->satx', dataset[1], transform) if order >= 2: dataset[2] = np.einsum('safxy,aft->satxy', dataset[2], transform) return dataset
[docs] def dump_params(self): """Dump its own parameters as :obj:`str`. Returns: str: Formed parameters. """ params_str = '' for element in self._elements: transform = self._transform[element] mean = self._mean[element] transform_str = ('\n'+' '*12).join([' '.join(map(str, row)) for row in transform.T]) mean_str = ' '.join(map(str, mean)) params_str += f''' {element} {transform.shape[1]} {transform.shape[0]} # transformation matrix {transform_str} # mean {mean_str} ''' return params_str
[docs] def load(self, file_path, verbose=True): """Load internal parameters for each element. Only root MPI process loads parameters. Args: file_path (~pathlib.Path): File path to load parameters. verbose (bool, optional): Print log to stdout. """ if MPI.rank == 0: ndarray = np.load(file_path) self._elements = ndarray['elements'].item() self._n_components = ndarray['n_components'].item() self._mean = {element: ndarray[f'mean:{element}'] for element in self._elements} self._transform = {element: ndarray[f'transform:{element}'] for element in self._elements} if verbose: pprint(f'Loaded PCA parameters from {file_path}.')
[docs] def save(self, file_path, verbose=True): """Save internal parameters for each element. Only root MPI process saves parameters. Args: file_path (~pathlib.Path): File path to save parameters. verbose (bool, optional): Print log to stdout. """ if MPI.rank == 0: info = { 'elements': self._elements, 'n_components': self._n_components, } mean = {f'mean:{k}': v for k, v in self._mean.items()} transform = {f'transform:{k}': v for k, v in self._transform.items()} np.savez(file_path, **info, **mean, **transform) if verbose: pprint(f'Saved PCA parameters to {file_path}.')
def _initialize_params(self, data, elemental_composition, verbose): """Initialize parameters only once for new elements.""" for element in set(elemental_composition) - self._elements: n_feature = data.shape[2] mask = np.array(elemental_composition) == element X = data[:, mask].reshape(-1, n_feature) pca = decomposition.PCA(n_components=self._n_components) pca.fit(X) if self._n_components is None: self._n_components = pca.n_components_ self._elements.add(element) self._mean[element] = pca.mean_.astype(np.float32) self._transform[element] = pca.components_.T.astype(np.float32) if verbose: pprint(f''' Initialized PCA parameters for {element} Feature dimension: {n_feature} => {self._n_components} Cumulative contribution rate = {np.sum(pca.explained_variance_ratio_)} ''')