From bf0b502998bcbce741928999732e41f2d33dbb16 Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 3 Jun 2024 20:16:04 +0200 Subject: [PATCH] Overhaul part 2: - Use 2 classes, `DistributionDataFrame` and `DistributionDataFrameHistogram` - Build out pipeline usage - Rely more on `pandas.DataFrame` --- src/nomad_simulations/model_system.py | 364 ++++++++++++-------------- 1 file changed, 167 insertions(+), 197 deletions(-) diff --git a/src/nomad_simulations/model_system.py b/src/nomad_simulations/model_system.py index 3628d37a..2cc7a613 100644 --- a/src/nomad_simulations/model_system.py +++ b/src/nomad_simulations/model_system.py @@ -345,18 +345,18 @@ class GeometryDistribution(ArchiveSection): """, ) - bins = Quantity( - type=np.float64, - shape=['n_bins'], + n_bins = Quantity( + type=np.int32, description=""" - Binning used to create the histogram. + Number of bins used to create the histogram. """, ) - n_bins = Quantity( - type=np.int32, + bins = Quantity( + type=np.float64, + shape=['n_bins'], description=""" - Number of bins used to create the histogram. + Binning used to create the histogram. """, ) @@ -479,233 +479,203 @@ def normalize(self, archive, logger: BoundLogger): return self +from functools import reduce import math import pandas as pd -from typing import Union +from typing import Union, NewType + +Mode = NewType( + 'Mode', + Union[ + tuple[str, str], + tuple[str, str, str], + tuple[str, str, str, str], + ], +) + +Modes = NewType( + 'Modes', + Union[ + list[tuple[str, str]], + list[tuple[str, str, str]], + list[tuple[str, str, str, str]], + ], +) class DistributionDataFrame: def __init__( - self, ase_atoms: ase.Atoms, cutoff_mappings: dict[str, float] = {} + self, + ase_atoms: ase.Atoms, + cutoff_map: dict[Union[str, tuple[str, str]], pint.Quantity] = {}, ) -> None: - """`cutoff_mappings` should contain all elements in `ase_atoms`.""" - self.ase_atoms = ase_atoms - if cutoff_mappings: - self.df_elements = pd.DataFrame( - {'element': cutoff_mappings.keys(), 'cutoff': cutoff_mappings.values()} - ) - else: - elements = ase_atoms.get_chemical_symbols() - pd.DataFrame({'element': elements, 'cutoff': [None for _ in elements]}) - self._df_combo_headers = ( - 'extremity_left', - 'center_left', - 'center_right', - 'extremity_right', - 'cutoff', - 'dist_storage_index', - ) + # set up dataframe headers + self._df_cutoffs_headers = ('el_l', 'el_r', 'cutoff') + self._id_distrs_headers = ('id_lx', 'id_lc', 'id_rc', 'id_rx', 'value') # matching the order of the `ase` package: # https://wiki.fysik.dtu.dk/ase/_modules/ase/geometry/analysis.html#Analysis.get_dihedrals - self.df_combos = pd.DataFrame({k: [] for k in self._df_combo_headers}) - self.distributions: dict[int, pd.DataFrame] = {} - self._distribution_headers = ( - 'extremity_left_id', - 'center_left_id', - 'center_right_id', - 'extremity_right_id', - 'distribution', - ) - def _init_distribution(self, combo: tuple[str, str, str, str]): - if combo not in self.distributions: - self.distributions[combo] = pd.DataFrame( - {k: [] for k in self._distribution_headers} + # set up helper vars + self.ase_atoms = ase_atoms + if not cutoff_map: + cutoff_map = dict( + zip( + ase_atoms.get_chemical_symbols(), + ase_nl.natural_cutoffs(self.ase_atoms, mult=3.0) * ureg.angstrom, + ) # require matching element and cutoff ordering ) + s_cutoff_map = {k: v for k, v in cutoff_map.items() if isinstance(k, str)} + d_cutoffs = [ + [*k, v] + [*k[::-1], v] + for k, v in cutoff_map.items() + if isinstance(k, tuple) + ] - def _prep_combos( - self, elements: list[str], mode: int - ) -> list[Union[tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]]]: - combos = [] - for i, extr_l in enumerate(elements): - for extr_r in elements[i:]: - if mode > 2: - for cent_l in elements: - if mode > 3: - for cent_r in elements: - combos.append((extr_l, cent_l, cent_r, extr_r)) - else: - combos.append((extr_l, cent_l, extr_r)) - else: - combos.append((extr_l, extr_r)) - return combos - - def _prep_combo_row( - self, - combos: list[ - Union[tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]] - ], - ) -> dict[str, list[tuple[str, str, str, str]]]: - return dict( - zip( - self._df_combo_headers[:4], - [c[0] for c in combos], - [c[1] if len(c) > 2 else None for c in combos], - [c[2] if len(c) > 3 else None for c in combos], - [c[-1] for c in combos], - ) + # set up dataframes + self._df_cutoffs = pd.concat( + [ + pd.DataFrame(headers=self._df_cutoffs_headers, data=d_cutoffs), + self._el_combo_cutoffs(s_cutoff_map), + ], + axis=0, + unique=True, + ) # symmetrized cutoffs + self._df_atoms = pd.Series(data=ase_atoms.get_chemical_symbols()) # id to el + self._id_distrs = pd.DataFrame(headers=self._id_distrs_headers) + + def _el_distrs(self) -> pd.DataFrame: + """Return the distribution dataframe with the element names.""" + return self._id_distrs.map(lambda x: self._df_atoms[x]) + + def _el_combo_cutoffs(self, cutoff_map: dict[str, pint.Quantity]) -> pd.DataFrame: + """Generate a `pandas.DataFrame` of all possible combinations + of individual elements and their distance cutoffs, i.e. `cutoff_map`.""" + df_single = pd.DataFrame( + {'el': cutoff_map.keys(), 'cutoff': cutoff_map.values()} ) + df_double = pd.merge(df_single, df_single, how='cross', suffixes=('_l', '_r')) + df_double['cutoff'] = df_double[['cutoff_l', 'cutoff_r']].sum(axis=1) + df_double.drop(columns=['cutoff_l', 'cutoff_r'], inplace=True) + return df_double + + def _sel_distr_heads(self, ll: int) -> list[str]: + """Dynamically select the distribution columns by name.""" + sel_cols = {2: r'id_.x', 3: r'id_(.x|l.)', 4: r'id_..'} + return [ + col + for col in self._id_distrs_headers + if re.match(sel_cols[ll] + '|value', col) + ] - def factory( - self, - modes: list[ - Union[int, tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]] - ], - cutoff_mappings: dict[ - Union[tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]], - float, - ] = {}, - ): - """`cutoff_mappings` should contain all combinations in `modes`.""" - elements = self.ase_atoms.get_chemical_symbols() - for mode in modes: # ? add sorting - cs = self._prep_combos(elements, mode) if isinstance(mode, int) else [mode] - new_rows = self._prep_combo_row(cs) - if cutoff_mappings: - new_rows['cutoff'] = [cutoff_mappings[c] for c in cs] - new_rows['dist_storage_index'] = [None for _ in cs] - # ? what if the combo already exists - self.df_combos = self.df_combos.append(new_rows, ignore_index=True) - # note that this does not protect against duplicates - return self + def _gen_el_combos(self, mode: Mode) -> Modes: + """Generate the element combinations for the given `mode`.""" + sel = [ + self._id_distrs[ + self._id_distrs['el_l'].str.startswith(el_l) + & self._id_distrs['el_r'].str.startswith(el_r) + ].drop('cutoff') + for el_l, el_r in zip(mode[:-1], mode[1:]) + ] - def _get_all_cutoffs(self) -> dict[tuple[str, str], float]: - cutoffs = self.df_elements.join( - self.df_elements, how='cross', lsuffix='_left', rsuffix='_right' + df_modes = reduce( + lambda x, y: pd.merge(x, y, how='inner', left_on='el_r', right_on='el_l'), + sel, ) - cutoffs['cutoff'] = cutoffs[['cutoff_left', 'cutoff_right']].sum(axis=1) - cutoffs.drop(columns=['cutoff_left', 'cutoff_right'], inplace=True) - cutoffs.rename( - columns={ - 'element_left': 'extremity_left', - 'element_right': 'extremity_right', - }, - inplace=True, - ).set_index(['extremity_left', 'extremity_right']) - cutoffs.join( - self.df_combos[['extremity_left', 'extremity_right', 'cutoff']].set_index( - ['extremity_left', 'extremity_right'] - ), - how='right', - ) - return cutoffs.to_dict() + return list(df_modes.to_dict('records').values()) - def analyze(self): - # run analysis - neighbor_list = ase_nl.build_neighbor_list( + def analyze(self, modes: Modes): + """Add geometry anlysis to distribution dataframe. + Can only add `modes` of the same length, i.e. 2, 3, 4. + Accepts '' as a wildcard for any element.""" + caller_map = {2: an.get_bonds, 3: an.get_angles, 4: an.get_dihedrals} + + # set up analysis + neighbor_list = ase_nl.neighbor_list( + 'ijd', self.ase_atoms, - self._get_all_cutoffs().values().to('angstrom').magnitude, + self._df_cutoffs.set_index(['el_l', 'el_r']).to_dict()['cutoff'], self_interaction=False, ) an = ase_as.Analysis(self.ase_atoms, neighbor_list) # return type of analysis.get_x: list[list[tuple[int, int, float]]], # where the first index denotes the frame, the second the atom, and the tuple the full atom indices and distance - sel_cols = { - 2: ['extremity_left', 'extremity_right', 'cutoff'], - 3: ['extremity_left', 'extremity_right', 'center_left', 'cutoff'], - 4: [ - 'extremity_left', - 'center_left', - 'center_right', - 'extremity_right', - 'cutoff', - ], - } - caller_map = {2: an.get_bonds, 3: an.get_angles, 4: an.get_dihedrals} + # store the distribution analysis + ll = len(modes[0]) # since modes are all of the same length + ms = [self._gen_el_combos(m) if '' in m else m for m in modes] + id_combos = [caller_map[ll](*m, unique=True) for m in ms] + geom_iter = zip(*id_combos[0], an.get_values(id_combos)[0]) + # we assume only 1 image + self._id_distrs = pd.concat( + self._id_distrs, + pd.DataFrame(data=list(geom_iter), columns=self._sel_distr_heads(ll)), + axis=0, + ) + return self - for _, row in self.df_combos.iloc[:, :4].iterrows(): - combo = row.dropna() - ll = len(combo) + def to_hist(self) -> DistributionDataFrameHistogram: + return DistributionDataFrameHistogram(self._el_distrs, self._df_cutoffs) - self._init_distribution(combo) - self.distributions[combo].append( - pd.DataFrame(caller_map[ll](*combo, unique=True), headers=sel_cols[ll]) - ) - return self - def get_full_df( +class DistributionDataFrameHistogram: + def __init__( self, - modes: list[ - Union[int, tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]] - ], - ) -> pd.DataFrame: - df_combos = pd.DataFrame(headers=self._df_combo_headers) - for mode in modes: - if isinstance(mode, int): - pass - else: - df_combos = df_combos.append(self.df_combos[self.df_combos[mode]]) - - def df_to_hist( - self, df: pd.DataFrame, no_zeros: bool = True, normalize: bool = True - ): - if no_zeros: - hist = df[df['distribution'] > 0] - hist = hist.groupby('distribution').size().reset_index(name='count') - if normalize: - hist['count'] = hist['count'].apply(lambda x: x / hist['count'].min()) + el_distrs: pd.DataFrame, + cutoffs: pd.DataFrame, + ll: int, + bins: pint.Quantity, + ) -> None: + self._ll + self._hists: dict[Mode, pd.DataFrame] = {} + self._cutoffs: dict[Mode, pint.Quantity] = {} + protocol = ( + lambda x: self._sparsify(x, bins) + if isinstance(bins.magnitude, (int, float)) + else self._bin(x, bins) + ) + + distr = self._sel_distr_cols(el_distrs) + for mode, df in distr.groupby(distr.columns[:-1]): + self._hists[mode] = self._cmp_hist(protocol(df)) + self._cutoffs[mode] = cutoffs.loc[mode] + + def _sel_distr_cols(self, df: pd.DataFrame) -> pd.DataFrame: + """Select the appropriate elemental combinations.""" + return df[df.count(axis=1) == self._ll].dropna(axis=0) + + def _cmp_hist(self, df: pd.Series) -> pd.DataFrame: + """Return a histogram of the distribution.""" + hist = df[df['value'] > 0] + hist = hist.groupby('value').size().reset_index(name='count') + hist['freq'] = hist['count'].apply(lambda x: x / hist['count'].min()) return hist - def sparsify( - self, - modes: list[ - Union[int, tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]] - ], - prec: pint.Quantity, - ) -> dict: - df = self.get_full_df(modes) - df['distribution'].apply(lambda x: math.floor(x / prec) * prec, inplace=True) - return self.df_to_hist(df, no_zeros=True, normalize=True) - - def bin( - self, - modes: list[ - Union[int, tuple[str, str], tuple[str, str, str], tuple[str, str, str, str]] - ], - binning: pint.Quantity, - ) -> dict: - """`bins` should be a list of bin edges, starting from below.""" - df = self.get_full_df(modes) - df['distribution'].apply( - lambda x: binning(np.min(np.where(x > binning))), inplace=True - ) - return self.df_to_hist(df, no_zeros=True, normalize=True) + def _sparsify(self, df: pd.DataFrame, prec: pint.Quantity) -> pd.DataFrame: + """Sparsify the distribution by rounding the values to the nearest `prec`.""" + return df['value'].map(lambda x: math.floor(x / prec) * prec, inplace=True) - def to_nomad( - self, - modes=[2, 3], - precs: list[pint.Quantity] = [0.001 * ureg.angstrom, 0.01 * ureg.degrees], - binnings: list[pint.Quantity] = [], - ) -> list[GeometryDistribution]: + def _bin(self, df: pd.DataFrame, binning: pint.Quantity) -> pd.DataFrame: + """Bin the distribution by the `binning` values.""" + return df['value'].map(lambda x: binning(np.min(np.where(x > binning)))) + + def get(self, mode: Mode) -> tuple[pint.Quantity, pd.DataFrame]: + return self._cutoffs[mode], self._hists[mode] + + def to_nomad(self) -> list[GeometryDistribution]: constructor_map = { - 0: GeometryDistribution, 2: DistanceGeometryDistribution, 3: AngleGeometryDistribution, 4: DihedralGeometryDistribution, } - - quants = binnings if binnings else precs - caller = self.bin if binnings else self.sparsify - if len(quants) != len(modes): - raise ValueError( - 'Length quantifiers, i.e. `precs` or `binnings` should match length `modes`.' + return [ + constructor_map[self._ll]( + element_cutoff_selection=mode, + distance_cutoffs=self._cutoffs[mode], + bins=self._hists[mode]['value'], + frequencies=self._hists[mode]['freq'], ) - - geom_dists: list[GeometryDistribution] = [] - for mode, quant in zip(modes, quants): - geom_dists.append(constructor_map[len(mode)](caller(mode, quant))) - return geom_dists + for mode in self._hists.keys() + ] class AtomicCell(Cell):