From d4fec928aafafa95b756357c0fa2fe41aab62859 Mon Sep 17 00:00:00 2001 From: JosePizarro3 Date: Wed, 5 Jun 2024 10:44:52 +0200 Subject: [PATCH] Fixed outputs.py module after improved testing --- src/nomad_simulations/outputs.py | 63 ++++--- tests/conftest.py | 36 ++-- tests/test_outputs.py | 306 +++++++++++++++++++++++++++---- 3 files changed, 329 insertions(+), 76 deletions(-) diff --git a/src/nomad_simulations/outputs.py b/src/nomad_simulations/outputs.py index 42e533fe..07e26b40 100644 --- a/src/nomad_simulations/outputs.py +++ b/src/nomad_simulations/outputs.py @@ -17,7 +17,7 @@ # from structlog.stdlib import BoundLogger -from typing import Optional +from typing import Optional, List from nomad.datamodel.data import ArchiveSection from nomad.metainfo import Quantity, SubSection @@ -103,7 +103,9 @@ class Outputs(ArchiveSection): # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - def extract_spin_polarized_property(self, property_name: str) -> list: + def extract_spin_polarized_property( + self, property_name: str + ) -> List[PhysicalProperty]: """ Extracts the spin-polarized properties if present from the property name and returns them as a list of two elements in which each element refers to each `spin_channel`. If the return list is empty, it means that the simulation is not @@ -113,7 +115,7 @@ def extract_spin_polarized_property(self, property_name: str) -> list: property_name (str): The name of the property to be extracted. Returns: - (list): The list of spin-polarized properties. + (List[PhysicalProperty]): The list of spin-polarized properties. """ spin_polarized_properties = [] properties = getattr(self, property_name) @@ -165,27 +167,36 @@ class SCFOutputs(Outputs): def get_last_scf_steps_value( self, - scf_last_steps: list, + scf_last_steps: List[Outputs], property_name: str, i_property: int, - scf_parameters: SelfConsistency, + scf_parameters: Optional[SelfConsistency], logger: BoundLogger, ) -> Optional[list]: """ Get the last two SCF values' magnitudes of a physical property and appends then in a list. Args: - scf_last_steps (list): The list of the last two SCF steps. + scf_last_steps (List[Outputs]): The list of SCF steps. This must be of length 2 in order to the method to work. property_name (str): The name of the physical property. i_property (int): The index of the physical property. + scf_parameters (Optional[SelfConsistency]): The self-consistency parameters section stored under `ModelMethod`. + logger (BoundLogger): The logger to log messages. Returns: - (Optional[list]): The list of the last two SCF values' magnitudes of a physical property. + (Optional[list]): The list of the last two SCF values (in magnitude) of the physical property. """ + # Initial check + if len(scf_last_steps) != 2: + logger.warning( + '`scf_last_steps` needs to be of length 2, pointing to the last 2 SCF steps performed in the simulation.' + ) + return [] + scf_values = [] for step in scf_last_steps: - scf_phys_property = getattr(step, property_name)[i_property] try: + scf_phys_property = getattr(step, property_name)[i_property] if scf_phys_property.value.u != scf_parameters.threshold_change_unit: logger.error( f'The units of the `scf_step.{property_name}.value` does not coincide with the units of the `self_consistency_ref.threshold_unit`.' @@ -200,36 +211,40 @@ def resolve_is_scf_converged( self, property_name: str, i_property: int, - phys_property: PhysicalProperty, + physical_property: PhysicalProperty, logger: BoundLogger, - ) -> Optional[bool]: + ) -> bool: """ Resolves if the physical property is converged or not after a SCF process. This is only ran when there are at least two `scf_steps` elements. Returns: - (bool): The flag indicating whether the physical property is converged or not after a SCF process. + (bool): Boolean indicating whether the physical property is converged or not after a SCF process. """ - # If there are not at least 2 `scf_steps`, return None + # Check that there are at least 2 `scf_steps` if len(self.scf_steps) < 2: logger.warning('The SCF normalization needs at least two SCF steps.') - return None + return False scf_last_steps = self.scf_steps[-2:] - # If there is not `self_consistency_ref` section, return None - scf_parameters = phys_property.self_consistency_ref + # Check for `self_consistency_ref` section + scf_parameters = physical_property.self_consistency_ref if scf_parameters is None: - return None + return False - # Extract the value.magnitude of the phys_property to be checked if converged or not + # Extract the value.magnitude of the `physical_property` to be checked if converged or not scf_values = self.get_last_scf_steps_value( - scf_last_steps, property_name, i_property, scf_parameters, logger + scf_last_steps=scf_last_steps, + property_name=property_name, + i_property=i_property, + scf_parameters=scf_parameters, + logger=logger, ) if scf_values is None or len(scf_values) != 2: logger.warning( f'The SCF normalization could not resolve the SCF values for the property `{property_name}`.' ) - return None + return False # Compare with the `threshold_change` scf_diff = abs(scf_values[0] - scf_values[1]) @@ -248,10 +263,7 @@ def normalize(self, archive, logger) -> None: # Resolve the `is_scf_converged` flag for all SCF obtained properties for property_name in self.m_def.all_sub_sections.keys(): # Skip the `scf_steps` and `custom_physical_property` sub-sections - if ( - property_name == 'scf_steps' - or property_name == 'custom_physical_property' - ): + if property_name == 'scf_steps': continue # Check if the physical property with that property name is populated @@ -264,5 +276,8 @@ def normalize(self, archive, logger) -> None: # Loop over the physical property of the same m_def type and set `is_scf_converged` for i_property, phys_property in enumerate(phys_properties): phys_property.is_scf_converged = self.resolve_is_scf_converged( - property_name, i_property, phys_property, logger + property_name=property_name, + i_property=i_property, + physical_property=phys_property, + logger=logger, ) diff --git a/tests/conftest.py b/tests/conftest.py index f3929ebe..5dce6b7a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -159,7 +159,9 @@ def generate_atomic_cell( def generate_scf_electronic_band_gap_template( - threshold_change: float = 1e-3, + n_scf_steps: int = 5, + threshold_change: Optional[float] = 1e-3, + threshold_change_unit: Optional[str] = 'joule', ) -> SCFOutputs: """ Generate a `SCFOutputs` section with a template for the electronic_band_gap property. @@ -167,20 +169,30 @@ def generate_scf_electronic_band_gap_template( scf_outputs = SCFOutputs() # Define a list of scf_steps with values of the total energy like [1, 1.1, 1.11, 1.111, etc], # such that the difference between one step and the next one decreases a factor of 10. - n_scf_steps = 5 - for i in range(1, n_scf_steps): - value = 1 + sum([1 / (10**j) for j in range(1, i + 1)]) - scf_step = Outputs( - electronic_band_gaps=[ElectronicBandGap(value=value * ureg.joule)] - ) + value = None + for i in range(n_scf_steps): + value = 1 + sum([1 / (10**j) for j in range(1, i + 2)]) + scf_step = Outputs(electronic_band_gaps=[ElectronicBandGap(value=value)]) scf_outputs.scf_steps.append(scf_step) # Add a SCF calculated PhysicalProperty - scf_outputs.electronic_band_gaps.append(ElectronicBandGap(value=value * ureg.joule)) + if value is not None: + scf_outputs.electronic_band_gaps.append(ElectronicBandGap(value=value)) + else: + scf_outputs.electronic_band_gaps.append(ElectronicBandGap()) # and a `SelfConsistency` ref section - scf_params = SelfConsistency( - threshold_change=threshold_change, threshold_change_unit='joule' - ) - scf_outputs.electronic_band_gaps[0].self_consistency_ref = scf_params + if threshold_change is not None: + model_method = ModelMethod( + numerical_settings=[ + SelfConsistency( + threshold_change=threshold_change, + threshold_change_unit=threshold_change_unit, + ) + ] + ) + simulation = generate_simulation(model_method=model_method, outputs=scf_outputs) + scf_outputs.electronic_band_gaps[ + 0 + ].self_consistency_ref = simulation.model_method[0].numerical_settings[0] return scf_outputs diff --git a/tests/test_outputs.py b/tests/test_outputs.py index d4efaaa2..2bafc273 100644 --- a/tests/test_outputs.py +++ b/tests/test_outputs.py @@ -17,14 +17,17 @@ # import pytest +from typing import Optional, List -from nomad.units import ureg from nomad.datamodel import EntryArchive from . import logger -from .conftest import generate_scf_electronic_band_gap_template +from .conftest import generate_simulation, generate_scf_electronic_band_gap_template -from nomad_simulations.outputs import Outputs, ElectronicBandGap +from nomad_simulations.model_system import ModelSystem +from nomad_simulations.numerical_settings import SelfConsistency +from nomad_simulations.outputs import Outputs, SCFOutputs +from nomad_simulations.properties import ElectronicBandGap class TestOutputs: @@ -32,61 +35,284 @@ class TestOutputs: Test the `Outputs` class defined in `outputs.py`. """ + def test_number_of_properties(self): + """ + Test how many properties are defined under `Outputs` and its order. This test is done in order to control better + which properties are already defined and in which order to control their normalizations + """ + outputs = Outputs() + assert len(outputs.m_def.all_sub_sections) == 12 + defined_properties = [ + 'fermi_levels', + 'chemical_potentials', + 'crystal_field_splittings', + 'hopping_matrices', + 'electronic_eigenvalues', + 'electronic_band_gaps', + 'electronic_dos', + 'fermi_surfaces', + 'electronic_band_structures', + 'permittivities', + 'absorption_spectra', + 'xas_spectra', + ] + assert list(outputs.m_def.all_sub_sections.keys()) == defined_properties + @pytest.mark.parametrize( - 'threshold_change, result', - [(1e-3, True), (1e-5, False)], + 'band_gaps, values, result_length, result', + [ + # no properties to extract + ([], [], 0, []), + # non-spin polarized case + ([ElectronicBandGap(variables=[])], [2.0], 0, []), + # spin polarized case + ( + [ + ElectronicBandGap(variables=[], spin_channel=0), + ElectronicBandGap(variables=[], spin_channel=1), + ], + [1.0, 1.5], + 2, + [ + ElectronicBandGap(variables=[], spin_channel=0), + ElectronicBandGap(variables=[], spin_channel=1), + ], + ), + ], ) - def test_is_scf_converged(self, threshold_change: float, result: bool): + def test_extract_spin_polarized_properties( + self, + band_gaps: List[ElectronicBandGap], + values: List[float], + result_length: int, + result: List[ElectronicBandGap], + ): """ - Test the `resolve_is_scf_converged` method. + Test the `extract_spin_polarized_property` method. + + Args: + band_gaps (List[ElectronicBandGap]): The `ElectronicBandGap` sections to be stored under `Outputs`. + values (List[float]): The values to be assigned to the `ElectronicBandGap` sections. + result_length (int): The expected length extracted from `extract_spin_polarized_property`. + result (List[ElectronicBandGap]): The expected result of the `extract_spin_polarized_property` method. """ - scf_outputs = generate_scf_electronic_band_gap_template( - threshold_change=threshold_change + outputs = Outputs() + + for i, band_gap in enumerate(band_gaps): + band_gap.value = values[i] + outputs.electronic_band_gaps.append(band_gap) + gaps = outputs.extract_spin_polarized_property( + property_name='electronic_band_gaps' ) - is_scf_converged = scf_outputs.resolve_is_scf_converged( + assert len(gaps) == result_length + if len(result) > 0: + for i, result_gap in enumerate(result): + result_gap.value = values[i] + # ? comparing the sections does not work + assert gaps[i].value == result_gap.value + else: + assert gaps == result + + @pytest.mark.parametrize( + 'model_system', + [(None), (ModelSystem(name='example'))], + ) + def test_set_model_system_ref(self, model_system: Optional[ModelSystem]): + """ + Test the `set_model_system_ref` method. + + Args: + model_system (Optional[ModelSystem]): The `ModelSystem` to be tested for the `model_system_ref` reference + stored in `Outputs`. + """ + outputs = Outputs() + simulation = generate_simulation(model_system=model_system, outputs=outputs) + model_system_ref = outputs.set_model_system_ref() + if model_system is not None: + assert model_system_ref == simulation.model_system[-1] + assert model_system_ref.name == 'example' + else: + assert model_system_ref is None + + @pytest.mark.parametrize( + 'model_system', + [(None), (ModelSystem(name='example'))], + ) + def test_normalize(self, model_system: Optional[ModelSystem]): + """ + Test the `normalize` method. + + Args: + model_system (Optional[ModelSystem]): The expected `model_system_ref` obtained after normalization and + initially stored under `Simulation.model_system[0]`. + """ + outputs = Outputs() + simulation = generate_simulation(model_system=model_system, outputs=outputs) + outputs.normalize(archive=EntryArchive(), logger=logger) + if model_system is not None: + assert outputs.model_system_ref == simulation.model_system[-1] + assert outputs.model_system_ref.name == 'example' + else: + assert outputs.model_system_ref is None + + +class TestSCFOutputs: + """ + Test the `SCFOutputs` class defined in `outputs.py`. + """ + + @pytest.mark.parametrize( + 'scf_last_steps, i_property, values, scf_parameters, result', + [ + # empty `scf_last_steps` + ([], 0, None, None, []), + # length of `scf_last_steps` is different from 2 + ([Outputs()], 0, None, None, []), + # no property matching `'electronic_band_gaps'` stored under the `scf_last_steps` + ([Outputs(), Outputs()], 0, None, None, []), + # `i_property` is out of range + ( + [ + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + ], + 2, + None, + None, + [], + ), + # no `value` stored in the `scf_last_steps` property + ( + [ + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + ], + 0, + None, + None, + [], + ), + # no `SelfConsistency` section and `threshold_change_unit` defined and macthing units for property `value` (`'joule'`) + ( + [ + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + ], + 0, + [1.0, 2.0], + None, + [], + ), + # valid case + ( + [ + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + Outputs(electronic_band_gaps=[ElectronicBandGap()]), + ], + 0, + [1.0, 2.0], + SelfConsistency(threshold_change=1e-6, threshold_change_unit='joule'), + [1.0, 2.0], + ), + ], + ) + def test_get_last_scf_steps_value( + self, + scf_last_steps: List[Outputs], + i_property: int, + values: List[float], + scf_parameters: Optional[SelfConsistency], + result: List[float], + ): + scf_outputs = SCFOutputs() + for i, scf_step in enumerate(scf_last_steps): + property_section = getattr(scf_step, 'electronic_band_gaps') + if property_section is not None and values is not None: + property_section[i_property].value = values[i] + scf_values = scf_outputs.get_last_scf_steps_value( + scf_last_steps=scf_last_steps, property_name='electronic_band_gaps', - i_property=0, - phys_property=scf_outputs.electronic_band_gaps[0], + i_property=i_property, + scf_parameters=scf_parameters, logger=logger, ) - assert is_scf_converged == result + assert scf_values == result - def test_extract_spin_polarized_properties(self): - """ - Test the `extract_spin_polarized_property` method. + @pytest.mark.parametrize( + 'n_scf_steps, threshold_change, property_name, i_property, result', + [ + # `n_scf_steps` is less than 2 + (0, None, '', 0, False), + # no `self_consistency_ref` section + (5, None, '', 0, False), + # no property matching `property_name` + (5, None, 'fermi_levels', 0, False), + # `i_property` is out of range + (5, None, 'electronic_band_gaps', 2, False), + # property is not converged + (5, 1e-5, 'electronic_band_gaps', 0, False), + # valid case: property is converged + (5, 1e-3, 'electronic_band_gaps', 0, True), + ], + ) + def test_resolve_is_scf_converged( + self, + n_scf_steps: int, + threshold_change: Optional[float], + property_name: str, + i_property: int, + result: bool, + ): """ - outputs = Outputs() + Test the `resolve_is_scf_converged` method. - # No spin-polarized band gap - band_gap_non_spin_polarized = ElectronicBandGap(variables=[]) - band_gap_non_spin_polarized.value = 2.0 * ureg.joule - outputs.electronic_band_gaps.append(band_gap_non_spin_polarized) - band_gaps = outputs.extract_spin_polarized_property('electronic_band_gaps') - assert band_gaps == [] - - # Spin-polarized band gaps - band_gap_spin_1 = ElectronicBandGap(variables=[], spin_channel=0) - band_gap_spin_1.value = 1.0 * ureg.joule - outputs.electronic_band_gaps.append(band_gap_spin_1) - band_gap_spin_2 = ElectronicBandGap(variables=[], spin_channel=1) - band_gap_spin_2.value = 1.5 * ureg.joule - outputs.electronic_band_gaps.append(band_gap_spin_2) - band_gaps = outputs.extract_spin_polarized_property('electronic_band_gaps') - assert len(band_gaps) == 2 - assert band_gaps[0].value.magnitude == 1.0 - assert band_gaps[1].value.magnitude == 1.5 + Args: + n_scf_steps (int): The number of SCF steps to add under `SCFOutputs`. + threshold_change (Optional[float]): The threshold change to be used for the SCF convergence. + property_name (str): The name of the property to be tested for convergence. + i_property (int): The index of the property to be tested for convergence. + result (bool): The expected result of the `resolve_is_scf_converged` method. + """ + scf_outputs = generate_scf_electronic_band_gap_template( + n_scf_steps=n_scf_steps, threshold_change=threshold_change + ) + is_scf_converged = scf_outputs.resolve_is_scf_converged( + property_name=property_name, + i_property=i_property, + physical_property=scf_outputs.electronic_band_gaps[0], + logger=logger, + ) + assert is_scf_converged == result @pytest.mark.parametrize( - 'threshold_change, result', - [(1e-3, True), (1e-5, False)], + 'n_scf_steps, threshold_change, result', + [ + # `n_scf_steps` is less than 2 + (0, None, False), + # no `self_consistency_ref` section + (5, None, False), + # property is not converged + (5, 1e-5, False), + # valid case: property is converged + (5, 1e-3, True), + ], ) - def test_normalize(self, threshold_change: float, result: bool): + def test_normalize( + self, + n_scf_steps: int, + threshold_change: float, + result: bool, + ): """ Test the `normalize` method. + + Args: + n_scf_steps (int): The number of SCF steps to add under `SCFOutputs`. + threshold_change (Optional[float]): The threshold change to be used for the SCF convergence. + result (bool): The expected result after normalization and population of the `is_scf_converged` quantity. """ scf_outputs = generate_scf_electronic_band_gap_template( - threshold_change=threshold_change + n_scf_steps=n_scf_steps, threshold_change=threshold_change ) - scf_outputs.normalize(EntryArchive(), logger) assert scf_outputs.electronic_band_gaps[0].is_scf_converged == result