diff --git a/src/pynxtools/dataconverter/readers/multi/reader.py b/src/pynxtools/dataconverter/readers/multi/reader.py index 4a11f5db7..dadf6eee5 100644 --- a/src/pynxtools/dataconverter/readers/multi/reader.py +++ b/src/pynxtools/dataconverter/readers/multi/reader.py @@ -23,6 +23,8 @@ import re from typing import Any, Callable, Dict, List, Optional, Tuple, Union +import numpy as np + from pynxtools.dataconverter.readers.base.reader import BaseReader from pynxtools.dataconverter.readers.utils import ( is_boolean, @@ -36,6 +38,59 @@ logger = logging.getLogger("pynxtools") +def evaluate_expression(expression: str, data: Dict[str, Any]) -> Any: + """ + Evaluates a string expression where keys are accessed from a dictionary and transformations are applied. + + Args: + expression (str): The string expression to evaluate, e.g., '/data/value + /someothervalue'. + data (Dict[str, Any]): A dictionary where keys are matched to parts of the expression. + + Returns: + Any: The result of the evaluated expression. + """ + if not expression: + logger.warning("Empty formula provided.") + return None + + # Prepare the safe environment for evaluation + safe_conversions = { + "mean": np.mean, + "min": np.min, + "max": np.max, + # "unit_conversion": ?? + } + + safe_conversions.update({"__builtins__": {}}) # Disable built-ins for safety + + def resolve_key(key: str) -> Any: + """Resolve a key by removing leading '/' and accessing the dictionary.""" + if key.startswith("/"): + key = key.lstrip("/") + if key not in data: + raise KeyError(f"Key '{key}' not found in data.") + return data[key] + + # Use regex to replace only keys in the expression + def replace_keys(match: re.Match) -> str: + key = match.group(0) + return f"resolve_key('{key}')" + + # Match only valid dictionary keys (not operators or function calls) + pattern = r"(\/[\w\[\]\_\-/]+)" # this is currently not yet working + + resolved_expression = re.sub(pattern, replace_keys, expression) + + print(resolved_expression) # Debugging output to see the resolved expression + + # Evaluate the resolved expression + try: + return eval(resolved_expression, safe_conversions, {"resolve_key": resolve_key}) + except Exception as exc: + logger.warning(f"Formula '{expression}' could not be evaluated due to: {exc}") + return None + + def fill_wildcard_data_indices(config_file_dict, key, value, dims): """ Replaces the wildcard data indices (*) with the respective dimension entries. @@ -75,6 +130,7 @@ class ParseJsonCallbacks: "@link": used for linking (meta)data "@data": measurement data "@eln": ELN data not provided within the experiment file + "@formula": To calculate values based on the presence of other (meta)data Args: attrs_callback (Callable[[str], Any]): @@ -85,6 +141,8 @@ class ParseJsonCallbacks: The callback to retrieve links under the specified key. eln_callback (Callable[[str], Any]): The callback to retrieve eln values under the specified key. + formula_callback (Callable[[str], Any]): + The callback to control formula calculations. dims (List[str]): The dimension labels of the data. Defaults to None. entry_name (str): @@ -101,6 +159,7 @@ def __init__( data_callback: Optional[Callable[[str, str], Any]] = None, link_callback: Optional[Callable[[str, str], Any]] = None, eln_callback: Optional[Callable[[str, str], Any]] = None, + formula_callback: Optional[Callable[[str, str], Any]] = None, dims: Optional[Callable[[str, str], List[str]]] = None, entry_name: str = "entry", ): @@ -109,17 +168,28 @@ def __init__( "@link": link_callback if link_callback is not None else self.link_callback, "@data": data_callback if data_callback is not None else self.identity, "@eln": eln_callback if eln_callback is not None else self.identity, + "@formula": formula_callback + if link_callback is not None + else self.formula_callback, } self.dims = dims if dims is not None else lambda *_, **__: [] self.entry_name = entry_name - def link_callback(self, key: str, value: str) -> Dict[str, Any]: + def link_callback(self, _: str, value: str) -> Dict[str, Any]: """ Modify links to dictionaries with the correct entry name. """ return {"link": value.replace("/entry/", f"/{self.entry_name}/")} + def formula_callback(self, _: str, value: str) -> Dict[str, Any]: + """ + Modify formulas to start with "formula=". + """ + return { + "formula": value.replace("/ENTRY[entry]/", f"/ENTRY[{self.entry_name}]/") + } + def identity(self, _: str, value: str) -> str: """ Returns the input value unchanged. @@ -230,10 +300,13 @@ def parse_config_value(value: str) -> Tuple[str, Any]: ) # after filling, resolve links again: - if isinstance(new_entry_dict.get(key), str) and new_entry_dict[key].startswith( - "@link:" - ): - new_entry_dict[key] = {"link": new_entry_dict[key][6:]} + if isinstance(new_entry_dict.get(key), str): + if new_entry_dict[key].startswith("@link:"): + new_entry_dict[key] = {"link": new_entry_dict[key][6:]} + + if isinstance(new_entry_dict.get(key), dict) and "formula" in new_entry_dict[key]: + if formula := new_entry_dict[key]["formula"]: + new_entry_dict[key] = evaluate_expression(formula, new_entry_dict) def fill_from_config( @@ -263,9 +336,13 @@ def dict_sort_key(keyval: Tuple[str, Any]) -> bool: Besides, pythons sorted is stable, so this will keep the order of the keys which have the same sort key. """ - if isinstance(keyval[1], str): - return not keyval[1].startswith("!") - return True + value = keyval[1] + if isinstance(keyval, str): + if value.startswith(("!@formula:", "@formula:")): + return (2, keyval[0]) # Last + if value.startswith("!"): + return (0, keyval[0]) # First + return (1, keyval[0]) # Middle if callbacks is None: # Use default callbacks if none are explicitly provided