diff --git a/pynxtools/dataconverter/helpers.py b/pynxtools/dataconverter/helpers.py index c186a19f8..591db424e 100644 --- a/pynxtools/dataconverter/helpers.py +++ b/pynxtools/dataconverter/helpers.py @@ -21,8 +21,9 @@ import logging import re from datetime import datetime, timezone +from enum import Enum from functools import lru_cache -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Set, Tuple, Union import h5py import lxml.etree as ET @@ -30,6 +31,8 @@ from ase.data import chemical_symbols from pynxtools import get_nexus_version, get_nexus_version_hash +from pynxtools.dataconverter.template import Template +from pynxtools.definitions.dev_tools.utils.nxdl_utils import get_inherited_nodes from pynxtools.nexus import nexus from pynxtools.nexus.nexus import NxdlAttributeNotFoundError @@ -37,6 +40,93 @@ logger.setLevel(logging.INFO) +class ValidationProblem(Enum): + UnitWithoutDocumentation = 1 + InvalidEnum = 2 + OptionalParentWithoutRequiredGroup = 3 + OptionalParentWithoutRequiredField = 4 + MissingRequiredGroup = 5 + MissingRequiredField = 6 + InvalidType = 7 + InvalidDatetime = 8 + IsNotPosInt = 9 + + +class Collector: + """A class to collect data and return it in a dictionary format.""" + + def __init__(self): + self.data = set() + + def insert_and_log( + self, path: str, log_type: ValidationProblem, value: Optional[Any], *args + ): + """Inserts a path into the data dictionary and logs the action.""" + if value is None: + value = "" + + if log_type == ValidationProblem.UnitWithoutDocumentation: + logger.warning( + f"The unit, {path} = {value}, " + "is being written but has no documentation" + ) + elif log_type == ValidationProblem.InvalidEnum: + logger.warning( + f"The value at {path} should be on of the " + f"following strings: {value}" + ) + elif log_type == ValidationProblem.OptionalParentWithoutRequiredGroup: + logger.warning( + f"The required group, {path}, hasn't been supplied" + f" while its optional parent, {value}, is supplied." + ) + elif log_type == ValidationProblem.OptionalParentWithoutRequiredField: + logger.warning( + f"The data entry, {path}, has an optional parent, " + f"{value}, with required children set. Either" + f" provide no children for {value} or provide" + f" all required ones." + ) + elif log_type == ValidationProblem.MissingRequiredGroup: + logger.warning(f"The required group, {path}, hasn't been supplied.") + elif log_type == ValidationProblem.MissingRequiredField: + logger.warning( + f"The data entry corresponding to {path} is required " + "and hasn't been supplied by the reader.", + ) + elif log_type == ValidationProblem.InvalidType: + logger.warning( + f"The value at {path} should be one of: {value}" + f", as defined in the NXDL as {args[0] if args else ''}." + ) + elif log_type == ValidationProblem.InvalidDatetime: + logger.warning( + f"The value at {path} = {value} should be a timezone aware ISO8601 " + "formatted str. For example, 2022-01-22T12:14:12.05018Z" + " or 2022-01-22T12:14:12.05018+00:00." + ) + elif log_type == ValidationProblem.IsNotPosInt: + logger.warning( + f"The value at {path} should be a positive int, but is {value}." + ) + self.data.add(path) + + def has_validation_problems(self): + """Returns True if there were any validation problems.""" + return len(self.data) > 0 + + def get(self): + """Returns the set of problematic paths.""" + return self.data + + def clear(self): + """Clears the collected data.""" + self.data = set() + + +collector = Collector() + + def is_a_lone_group(xml_element) -> bool: """Checks whether a given group XML element has no field or attributes mentioned""" if xml_element.get("type") == "NXentry": @@ -141,14 +231,17 @@ def generate_template_from_nxdl( return suffix = "" - if "name" in root.attrib: + if "name" in root.attrib and not contains_uppercase(root.attrib["name"]): suffix = root.attrib["name"] - if any(map(str.isupper, suffix)): - suffix = f"{suffix}[{suffix.lower()}]" elif "type" in root.attrib: nexus_class = convert_nexus_to_caps(root.attrib["type"]) - hdf5name = f"[{convert_nexus_to_suggested_name(root.attrib['type'])}]" - suffix = f"{nexus_class}{hdf5name}" + name = root.attrib.get("name") + hdf_name = root.attrib.get("type")[2:] # .removeprefix("NX") (python > 3.8) + suffix = ( + f"{name}[{name.lower()}]" + if name is not None + else f"{nexus_class}[{hdf_name}]" + ) path = path + "/" + (f"@{suffix}" if tag == "attribute" else suffix) @@ -213,8 +306,17 @@ def convert_nexus_to_caps(nexus_name): return nexus_name[2:].upper() +def contains_uppercase(field_name: Optional[str]) -> bool: + """Helper function to check if a field name contains uppercase characters.""" + if field_name is None: + return False + return any(char.isupper() for char in field_name) + + def convert_nexus_to_suggested_name(nexus_name): """Helper function to suggest a name for a group from its NeXus class.""" + if contains_uppercase(nexus_name): + return nexus_name return nexus_name[2:] @@ -365,14 +467,13 @@ def is_valid_data_field(value, nxdl_type, path): if value is None: raise ValueError return accepted_types[0](value) - except ValueError as exc: - raise ValueError( - f"The value at {path} should be of Python type: {accepted_types}" - f", as defined in the NXDL as {nxdl_type}." - ) from exc + except ValueError: + collector.insert_and_log( + path, ValidationProblem.InvalidType, accepted_types, nxdl_type + ) if nxdl_type == "NX_POSINT" and not is_positive_int(value): - raise ValueError(f"The value at {path} should be a positive int.") + collector.insert_and_log(path, ValidationProblem.IsNotPosInt, value) if nxdl_type in ("ISO8601", "NX_DATE_TIME"): iso8601 = re.compile( @@ -381,22 +482,19 @@ def is_valid_data_field(value, nxdl_type, path): ) results = iso8601.search(value) if results is None: - raise ValueError( - f"The date at {path} should be a timezone aware ISO8601 " - f"formatted str. For example, 2022-01-22T12:14:12.05018Z" - f" or 2022-01-22T12:14:12.05018+00:00." - ) + collector.insert_and_log(path, ValidationProblem.InvalidDatetime, value) return value @lru_cache(maxsize=None) -def path_in_data_dict(nxdl_path: str, data_keys: Tuple[str, ...]) -> Tuple[bool, str]: +def path_in_data_dict(nxdl_path: str, data_keys: Tuple[str, ...]) -> List[str]: """Checks if there is an accepted variation of path in the dictionary & returns the path.""" + found_keys = [] for key in data_keys: if nxdl_path == convert_data_converter_dict_to_nxdl_path(key): - return True, key - return False, None + found_keys.append(key) + return found_keys def check_for_optional_parent(path: str, nxdl_root: ET.Element) -> str: @@ -430,17 +528,17 @@ def is_node_required(nxdl_key, nxdl_root): def all_required_children_are_set(optional_parent_path, data, nxdl_root): """Walks over optional parent's children and makes sure all required ones are set""" - optional_parent_path = convert_data_converter_dict_to_nxdl_path( - optional_parent_path - ) for key in data: if key in data["lone_groups"]: continue nxdl_key = convert_data_converter_dict_to_nxdl_path(key) + name = nxdl_key[nxdl_key.rfind("/") + 1 :] + renamed_path = f"{optional_parent_path}/{name}" if ( - nxdl_key[0 : nxdl_key.rfind("/")] == optional_parent_path + nxdl_key[: nxdl_key.rfind("/")] + == convert_data_converter_dict_to_nxdl_path(optional_parent_path) and is_node_required(nxdl_key, nxdl_root) - and data[path_in_data_dict(nxdl_key, tuple(data.keys()))[1]] is None + and (renamed_path not in data or data[renamed_path] is None) ): return False @@ -458,16 +556,22 @@ def is_nxdl_path_a_child(nxdl_path: str, parent: str): def check_optionality_based_on_parent_group(path, nxdl_path, nxdl_root, data, template): """Checks whether field is part of an optional parent and then confirms its optionality""" + + def trim_path_to(parent: str, path: str): + count = len(parent.split("/")) + return "/".join(path.split("/")[:count]) + for optional_parent in template["optional_parents"]: optional_parent_nxdl = convert_data_converter_dict_to_nxdl_path(optional_parent) if is_nxdl_path_a_child( nxdl_path, optional_parent_nxdl - ) and not all_required_children_are_set(optional_parent, data, nxdl_root): - logger.warning( - f"The data entry, {path}, has an optional parent, " - f"{optional_parent}, with required children set. Either" - f" provide no children for {optional_parent} or provide" - f" all required ones." + ) and not all_required_children_are_set( + trim_path_to(optional_parent, path), data, nxdl_root + ): + collector.insert_and_log( + path, + ValidationProblem.OptionalParentWithoutRequiredField, + optional_parent, ) @@ -496,40 +600,147 @@ def does_group_exist(path_to_group, data): return False -# pylint: disable=W1203 +def get_concept_basepath(path: str) -> str: + """Get the concept path from the path""" + path_list = path.split("/") + concept_path = [] + for p in path_list: + if re.search(r"[A-Z]", p): + concept_path.append(p) + return "/" + "/".join(concept_path) + + +def ensure_all_required_fields_exist_in_variadic_groups( + template: Template, data: Template, check_basepaths: Set[str] +): + """ + Checks whether all required fields (according to `template`) + in `data` are present in their respective + variadic groups given by `check_basepaths`. + + Args: + template (Template): The template to use as reference. + data (Template): The template containing the actual data + check_basepaths (Set[str]): + A set of basepaths of the form /ENTRY/MY_GROUP to check for missing fields. + All groups matching the basepath will be checked for missing fields. + """ + + @lru_cache(maxsize=None) + def get_required_fields_from(base_path: str) -> Set[str]: + required_fields = set() + for path in template["required"]: + if ( + get_concept_basepath(convert_data_converter_dict_to_nxdl_path(path)) + == base_path + ): + entry_name = get_name_from_data_dict_entry(path[path.rindex("/") + 1 :]) + if entry_name == "@units": + required_fields.add(f"{path.rsplit('/', 2)[1]}/@units") + continue + required_fields.add( + get_name_from_data_dict_entry(path[path.rindex("/") + 1 :]) + ) + + return required_fields + + @lru_cache(maxsize=None) + def get_concept_variations(base_path: str) -> Set[str]: + paths = set() + for path in data: + if ( + get_concept_basepath(convert_data_converter_dict_to_nxdl_path(path)) + == base_path + ): + paths.add(get_concept_basepath(path)) + return paths + + @lru_cache(maxsize=None) + def are_all_entries_none(path: str) -> bool: + concept_path = get_concept_basepath(path) + for key in filter(lambda x: x.startswith(concept_path), data): + if data[key] is not None: + return False + return True + + for base_path in check_basepaths: + all_fields_are_none = True + for path in get_concept_variations(base_path): + count = 0 + for required_field in get_required_fields_from(base_path): + if ( + f"{path}/{required_field}" not in data + or data[f"{path}/{required_field}"] is None + ): + missing_field = f"{path}/{required_field}" + count += 1 + if not are_all_entries_none(missing_field): + count -= 1 + collector.insert_and_log( + missing_field, ValidationProblem.MissingRequiredField, None + ) + + if count == 0: + # There are either no required fields, all required fields are set, + # or the missing fields already have been reported. + all_fields_are_none = False + + if all_fields_are_none: + # All entries in all variadic groups are None + generic_dict_path = "/" + "/".join( + map(lambda path: f"{path}[{path.lower()}]", base_path.split("/")[1:]) + ) + collector.insert_and_log( + generic_dict_path, ValidationProblem.MissingRequiredGroup, None + ) + + def ensure_all_required_fields_exist(template, data, nxdl_root): """Checks whether all the required fields are in the returned data object.""" + check_basepaths = set() for path in template["required"]: entry_name = get_name_from_data_dict_entry(path[path.rindex("/") + 1 :]) if entry_name == "@units": continue nxdl_path = convert_data_converter_dict_to_nxdl_path(path) - is_path_in_data_dict, renamed_path = path_in_data_dict( - nxdl_path, tuple(data.keys()) - ) + renamed_paths = path_in_data_dict(nxdl_path, tuple(data.keys())) + + if len(renamed_paths) > 1: + check_basepaths.add(get_concept_basepath(nxdl_path)) + continue + + if not renamed_paths: + renamed_path = path + else: + renamed_path = renamed_paths[0] - renamed_path = path if renamed_path is None else renamed_path if path in template["lone_groups"]: opt_parent = check_for_optional_parent(path, nxdl_root) if opt_parent != "<>": if does_group_exist(opt_parent, data) and not does_group_exist( renamed_path, data ): - logger.warning( - f"The required group, {path}, hasn't been supplied" - f" while its optional parent, {opt_parent}, is supplied." + collector.insert_and_log( + renamed_path, + ValidationProblem.OptionalParentWithoutRequiredGroup, + opt_parent, ) continue if not does_group_exist(renamed_path, data): - logger.warning(f"The required group, {path}, hasn't been supplied.") + collector.insert_and_log( + path, + ValidationProblem.MissingRequiredGroup, + None, + ) continue continue - if not is_path_in_data_dict or data[renamed_path] is None: - logger.warning( - f"The data entry corresponding to {path} is required " - f"and hasn't been supplied by the reader.", + if data[renamed_path] is None: + collector.insert_and_log( + renamed_path, ValidationProblem.MissingRequiredField, None ) + ensure_all_required_fields_exist_in_variadic_groups(template, data, check_basepaths) + def try_undocumented(data, nxdl_root: ET.Element): """Tries to move entries used that are from base classes but not in AppDef""" @@ -557,11 +768,12 @@ def try_undocumented(data, nxdl_root: ET.Element): try: elem = nexus.get_node_at_nxdl_path(nxdl_path=nxdl_path, elem=nxdl_root) - data[get_required_string(elem)][path] = data.undocumented[path] + optionality = get_required_string(elem) + data[optionality][path] = data.undocumented[path] del data.undocumented[path] units = f"{path}/@units" if units in data.undocumented: - data[get_required_string(elem)][units] = data.undocumented[units] + data[optionality][units] = data.undocumented[units] del data.undocumented[units] except NxdlAttributeNotFoundError: pass @@ -570,10 +782,11 @@ def try_undocumented(data, nxdl_root: ET.Element): def validate_data_dict(template, data, nxdl_root: ET.Element): """Checks whether all the required paths from the template are returned in data dict.""" assert nxdl_root is not None, "The NXDL file hasn't been loaded." + collector.clear() - # nxdl_path_set helps to skip validation check on the same type of nxdl signiture - # This reduces huge amount of runing time - nxdl_path_to_elm: dict = {} + @lru_cache(maxsize=None) + def get_xml_node(nxdl_path: str) -> ET.Element: + return nexus.get_node_at_nxdl_path(nxdl_path=nxdl_path, elem=nxdl_root) # Make sure all required fields exist. ensure_all_required_fields_exist(template, data, nxdl_root) @@ -584,18 +797,41 @@ def validate_data_dict(template, data, nxdl_root: ET.Element): entry_name = get_name_from_data_dict_entry(path[path.rindex("/") + 1 :]) nxdl_path = convert_data_converter_dict_to_nxdl_path(path) - if entry_name == "@units": - continue - if entry_name[0] == "@" and "@" in nxdl_path: index_of_at = nxdl_path.rindex("@") nxdl_path = nxdl_path[0:index_of_at] + nxdl_path[index_of_at + 1 :] - if nxdl_path in nxdl_path_to_elm: - elem = nxdl_path_to_elm[nxdl_path] - else: - elem = nexus.get_node_at_nxdl_path(nxdl_path=nxdl_path, elem=nxdl_root) - nxdl_path_to_elm[nxdl_path] = elem + if entry_name == "@units": + elempath = get_inherited_nodes(nxdl_path, None, nxdl_root)[1] + elem = elempath[-2] + field_path = path.rsplit("/", 1)[0] + if ( + field_path not in data.get_documented() + and "units" not in elem.attrib + ): + collector.insert_and_log( + path, ValidationProblem.UnitWithoutDocumentation, data[path] + ) + continue + + # TODO: If we want we could also enable unit validation here + # field = nexus.get_node_at_nxdl_path( + # nxdl_path=convert_data_converter_dict_to_nxdl_path( + # # The part below is the backwards compatible version of + # # nxdl_path.removesuffix("/units") + # nxdl_path[:-6] if nxdl_path.endswith("/units") else nxdl_path + # ), + # elem=nxdl_root, + # ) + # nxdl_unit = field.attrib.get("units", "") + # if not is_valid_unit(data[path], nxdl_unit): + # raise ValueError( + # f"Invalid unit in {path}. {data[path]} " + # f"is not in unit category {nxdl_unit}" + # ) + continue + + elem = get_xml_node(nxdl_path) # Only check for validation in the NXDL if we did find the entry # otherwise we just pass it along @@ -620,12 +856,9 @@ def validate_data_dict(template, data, nxdl_root: ET.Element): )[2] is_valid_enum, enums = is_value_valid_element_of_enum(data[path], elist) if not is_valid_enum: - logger.warning( - f"The value at {path} should be on of the " - f"following strings: {enums}" - ) + collector.insert_and_log(path, ValidationProblem.InvalidEnum, enums) - return True + return not collector.has_validation_problems() def remove_namespace_from_tag(tag): diff --git a/pynxtools/dataconverter/readers/example/reader.py b/pynxtools/dataconverter/readers/example/reader.py index 76294b3ac..44ff6ace6 100644 --- a/pynxtools/dataconverter/readers/example/reader.py +++ b/pynxtools/dataconverter/readers/example/reader.py @@ -59,6 +59,7 @@ def read( if ( k.startswith("/ENTRY[entry]/required_group") or k == "/ENTRY[entry]/optional_parent/req_group_in_opt_group" + or k.startswith("/ENTRY[entry]/OPTIONAL_group") ): continue diff --git a/pynxtools/dataconverter/template.py b/pynxtools/dataconverter/template.py index 2983e295b..e45e01f6e 100644 --- a/pynxtools/dataconverter/template.py +++ b/pynxtools/dataconverter/template.py @@ -151,16 +151,13 @@ def __getitem__(self, k): if k in ("optional_parents", "lone_groups"): return getattr(self, k) if k.startswith("/"): - try: + if k in self.optional: return self.optional[k] - except KeyError: - try: - return self.recommended[k] - except KeyError: - try: - return self.required[k] - except KeyError: - return self.undocumented[k] + if k in self.recommended: + return self.recommended[k] + if k in self.required: + return self.required[k] + return self.undocumented.get(k) if k in ("required", "optional", "recommended", "undocumented"): return self.get_optionality(k) raise KeyError( diff --git a/tests/data/dataconverter/NXtest.nxdl.xml b/tests/data/dataconverter/NXtest.nxdl.xml index f4aa0aab4..8695a20c9 100644 --- a/tests/data/dataconverter/NXtest.nxdl.xml +++ b/tests/data/dataconverter/NXtest.nxdl.xml @@ -19,6 +19,14 @@ + + + A dummy entry to test optional parent check for a required child. + + + A dummy entry to test optional parent check for an optional child. + + A dummy entry for a float value. diff --git a/tests/dataconverter/test_helpers.py b/tests/dataconverter/test_helpers.py index 8a1319fb6..723aca5cf 100644 --- a/tests/dataconverter/test_helpers.py +++ b/tests/dataconverter/test_helpers.py @@ -20,6 +20,7 @@ import logging import os import xml.etree.ElementTree as ET +from typing import Optional import numpy as np import pytest @@ -51,14 +52,28 @@ def alter_dict(data_dict: Template, key: str, value: object): return None -def set_to_none_in_dict(data_dict: Template, key: str, optionality: str): +def set_to_none_in_dict(data_dict: Optional[Template], key: str, optionality: str): """Helper function to forcefully set path to 'None'""" - if data_dict is not None: - internal_dict = Template(data_dict) - internal_dict[optionality][key] = None - return internal_dict + if data_dict is None: + return None - return None + internal_dict = Template(data_dict) + internal_dict[optionality][key] = None + return internal_dict + + +def set_whole_group_to_none( + data_dict: Optional[Template], key: str, optionality: str +) -> Optional[Template]: + """Set a whole path to None in the dict""" + if data_dict is None: + return None + + internal_dict = Template(data_dict) + for path in data_dict[optionality]: + if path.startswith(key): + internal_dict[optionality][path] = None + return internal_dict def remove_from_dict(data_dict: Template, key: str, optionality: str = "optional"): @@ -195,6 +210,28 @@ def fixture_filled_test_data(template, tmp_path): TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_name]/char_value"] = ( "just chars" # pylint: disable=E1126 ) +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/bool_value"] = True # pylint: disable=E1126 +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/int_value"] = 2 # pylint: disable=E1126 +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/int_value/@units"] = ( + "eV" # pylint: disable=E1126 +) +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/posint_value"] = ( + np.array( + [1, 2, 3], # pylint: disable=E1126 + dtype=np.int8, + ) +) # pylint: disable=E1126 +TEMPLATE["required"][ + "/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/posint_value/@units" +] = "kg" # pylint: disable=E1126 +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/char_value"] = ( + "just chars" # pylint: disable=E1126 +) +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/type"] = "2nd type" # pylint: disable=E1126 +TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/date_value"] = ( + "2022-01-22T12:14:12.05018+00:00" # pylint: disable=E1126 +) +TEMPLATE["required"]["/ENTRY[my_entry]/OPTIONAL_group[my_group]/required_field"] = 1 # pylint: disable=E1126 TEMPLATE["required"]["/ENTRY[my_entry]/definition"] = "NXtest" # pylint: disable=E1126 TEMPLATE["required"]["/ENTRY[my_entry]/definition/@version"] = "2.4.6" # pylint: disable=E1126 TEMPLATE["required"]["/ENTRY[my_entry]/program_name"] = "Testing program" # pylint: disable=E1126 @@ -202,6 +239,7 @@ def fixture_filled_test_data(template, tmp_path): TEMPLATE["required"]["/ENTRY[my_entry]/NXODD_name[nxodd_name]/date_value"] = ( "2022-01-22T12:14:12.05018+00:00" # pylint: disable=E1126 ) +TEMPLATE["optional"]["/ENTRY[my_entry]/OPTIONAL_group[my_group]/optional_field"] = 1 TEMPLATE["optional"]["/ENTRY[my_entry]/required_group/description"] = ( "An example description" ) @@ -231,7 +269,7 @@ def fixture_filled_test_data(template, tmp_path): ), ( "The value at /ENTRY[my_entry]/NXODD_name[nxodd_name]/in" - "t_value should be of Python type: (, , , )," " as defined in the NXDL as NX_INT." ), @@ -245,7 +283,7 @@ def fixture_filled_test_data(template, tmp_path): ), ( "The value at /ENTRY[my_entry]/NXODD_name[nxodd_name]/bool_value sh" - "ould be of Python type: (, , , , ), as defined in the NXDL as NX_BOOLEAN." ), id="string-instead-of-int", @@ -265,7 +303,7 @@ def fixture_filled_test_data(template, tmp_path): ), ( "The value at /ENTRY[my_entry]/NXODD_name[nxodd_name]/posint_value " - "should be a positive int." + "should be a positive int, but is -1." ), id="negative-posint", ), @@ -293,12 +331,55 @@ def fixture_filled_test_data(template, tmp_path): "/ENTRY[my_entry]/NXODD_name[nxodd_name]/bool_value", "required", ), + ( + "The data entry corresponding to /ENTRY[my_entry]/NXODD_name[nxodd_name]/bool_value is" + " required and hasn't been supplied by the reader." + ), + id="empty-required-field", + ), + pytest.param( + set_to_none_in_dict( + TEMPLATE, + "/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/bool_value", + "required", + ), + ( + "The data entry corresponding to /ENTRY[my_entry]/NXODD_name[nxodd_two_name]/bool_value is" + " required and hasn't been supplied by the reader." + ), + id="empty-required-field", + ), + pytest.param( + remove_from_dict( + remove_from_dict( + TEMPLATE, + "/ENTRY[my_entry]/NXODD_name[nxodd_two_name]/bool_value", + "required", + ), + "/ENTRY[my_entry]/NXODD_name[nxodd_name]/bool_value", + "required", + ), ( "The data entry corresponding to /ENTRY[entry]/NXODD_name[nxodd_name]/bool_value is" " required and hasn't been supplied by the reader." ), id="empty-required-field", ), + pytest.param( + set_whole_group_to_none( + set_whole_group_to_none( + TEMPLATE, + "/ENTRY[my_entry]/NXODD_name", + "required", + ), + "/ENTRY[my_entry]/NXODD_name", + "optional", + ), + ( + "The required group, /ENTRY[entry]/NXODD_name[nxodd_name], hasn't been supplied." + ), + id="all-required-fields-set-to-none", + ), pytest.param( alter_dict( TEMPLATE, @@ -323,7 +404,8 @@ def fixture_filled_test_data(template, tmp_path): "/ENTRY[my_entry]/NXODD_name[nxodd_name]/date_value", "2022-01-22T12:14:12.05018-00:00", ), - "The date at /ENTRY[my_entry]/NXODD_name[nxodd_name]/date_value should be a timezone aware" + "The value at /ENTRY[my_entry]/NXODD_name[nxodd_name]/date_value" + " = 2022-01-22T12:14:12.05018-00:00 should be a timezone aware" " ISO8601 formatted str. For example, 2022-01-22T12:14:12.05018Z or 2022-01-22" "T12:14:12.05018+00:00.", id="UTC-with--00:00", @@ -351,6 +433,29 @@ def fixture_filled_test_data(template, tmp_path): ), id="atleast-one-required-child-not-provided-optional-parent", ), + pytest.param( + set_to_none_in_dict( + TEMPLATE, + "/ENTRY[my_entry]/OPTIONAL_group[my_group]/required_field", + "required", + ), + ( + "The data entry, /ENTRY[my_entry]/OPTIONAL_group[my_group]/optional_field, has an " + "optional parent, /ENTRY[entry]/OPTIONAL_group[optional_group], with required children set" + ". Either provide no children for /ENTRY[entry]/OPTIONAL_group[optional_group] or provide " + "all required ones." + ), + id="required-field-not-provided-in-variadic-optional-group", + ), + pytest.param( + set_to_none_in_dict( + TEMPLATE, + "/ENTRY[my_entry]/OPTIONAL_group[my_group]/optional_field", + "required", + ), + (""), + id="required-field-provided-in-variadic-optional-group", + ), pytest.param( alter_dict( alter_dict( @@ -378,7 +483,7 @@ def fixture_filled_test_data(template, tmp_path): remove_from_dict( TEMPLATE, "/ENTRY[my_entry]/required_group/description" ), - "/ENTRY[my_entry]/required_group", + "/ENTRY[entry]/required_group", None, ), "The required group, /ENTRY[entry]/required_group, hasn't been supplied.", @@ -415,8 +520,11 @@ def test_validate_data_dict( "int-instead-of-chars", "link-dict-instead-of-bool", "opt-group-completely-removed", + "required-field-provided-in-variadic-optional-group", ): - helpers.validate_data_dict(template, data_dict, nxdl_root) + with caplog.at_level(logging.WARNING): + assert helpers.validate_data_dict(template, data_dict, nxdl_root) + assert caplog.text == "" # Missing required fields caught by logger with warning elif request.node.callspec.id in ( "empty-required-field", @@ -426,22 +534,14 @@ def test_validate_data_dict( "missing-empty-yet-required-group2", ): assert "" == caplog.text - # logger records captured_logs = caplog.records - helpers.validate_data_dict(template, data_dict, nxdl_root) + assert not helpers.validate_data_dict(template, data_dict, nxdl_root) assert any(error_message in rec.message for rec in captured_logs) - elif request.node.callspec.id in ( - "wrong-enum-choice", - "atleast-one-required-child-not-provided-optional-parent", - ): + else: with caplog.at_level(logging.WARNING): - helpers.validate_data_dict(template, data_dict, nxdl_root) + assert not helpers.validate_data_dict(template, data_dict, nxdl_root) assert error_message in caplog.text - else: - with pytest.raises(Exception) as execinfo: - helpers.validate_data_dict(template, data_dict, nxdl_root) - assert (error_message) == str(execinfo.value) @pytest.mark.parametrize( @@ -449,12 +549,10 @@ def test_validate_data_dict( [ pytest.param( "/ENTRY/definition/@version", - (True, "/ENTRY[entry]/definition/@version"), + ["/ENTRY[entry]/definition/@version"], id="path-exists-in-dict", ), - pytest.param( - "/RANDOM/does/not/@exist", (False, None), id="path-does-not-exist-in-dict" - ), + pytest.param("/RANDOM/does/not/@exist", [], id="path-does-not-exist-in-dict"), ], ) def test_path_in_data_dict(nxdl_path, expected, template):