diff --git a/README.md b/README.md index e30b71a..efdf8e7 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,18 @@ To create a timeseries metadata object run from metadataschemas import indicator_schema indicator_metadata = indicator_schema.TimeseriesSchema(idno='project_idno',series_description=indicator_schema.SeriesDescription(idno='project_idno', name='project_name')) + +indicator_metadata.pretty_print() +``` +And the print statement will show you the metadata object in a pleasant format. +```python +TimeseriesSchema( + idno='project_idno', + series_description=series_description( + idno='project_idno', + name='project_name' + ) +) ``` Depending on your IDE, selecting `TimeseriesSchema` could show you what fields the schema contains and their corresponding object definitions. @@ -37,7 +49,7 @@ There are metadata objects for each of the following metadata types: | indicator | `indicator_schema.TimeseriesSchema` | | indicators_db | `indicators_db_schema.TimeseriesDatabaseSchema` | | microdata | `microdata_schema.MicrodataSchema` | -| resource |`resource_schema.Model` | +| resource | `resource_schema.Model` | | script | `script_schema.ResearchProjectSchemaDraft` | | table | `table_schema.Model` | | video | `video_schema.Model` | @@ -61,7 +73,7 @@ mm = MetadataManager() filename = mm.write_metadata_outline_to_excel('indicator') -filename = mm.save_metadata_to_excel('indicator', object=indicator_metadata) +filename = mm.save_metadata_to_excel(indicator_metadata) # Then after you have updated the metadata in the Excel file @@ -77,7 +89,7 @@ mm.metadata_type_names microdata_type = mm.metadata_class_from_name("microdata") # create an instantiated pydantic object and then fill in your data -microdata_metadata = mm.type_to_outline(metadata_type="microdata") +microdata_metadata = mm.create_metadata_outline("microdata") microdata_metadata.repositoryid = "repository id" microdata_metadata.study_desc.title_statement.idno = "project_idno" ``` diff --git a/poetry.lock b/poetry.lock index b70f941..7efd3f9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -728,6 +728,30 @@ traitlets = ">=5.3" docs = ["myst-parser", "pydata-sphinx-theme", "sphinx-autodoc-typehints", "sphinxcontrib-github-alt", "sphinxcontrib-spelling", "traitlets"] test = ["ipykernel", "pre-commit", "pytest (<8)", "pytest-cov", "pytest-timeout"] +[[package]] +name = "markdown-it-py" +version = "3.0.0" +description = "Python port of markdown-it. Markdown parsing, done right!" +optional = false +python-versions = ">=3.8" +files = [ + {file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"}, + {file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"}, +] + +[package.dependencies] +mdurl = ">=0.1,<1.0" + +[package.extras] +benchmarking = ["psutil", "pytest", "pytest-benchmark"] +code-style = ["pre-commit (>=3.0,<4.0)"] +compare = ["commonmark (>=0.9,<1.0)", "markdown (>=3.4,<4.0)", "mistletoe (>=1.0,<2.0)", "mistune (>=2.0,<3.0)", "panflute (>=2.3,<3.0)"] +linkify = ["linkify-it-py (>=1,<3)"] +plugins = ["mdit-py-plugins"] +profiling = ["gprof2dot"] +rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"] +testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"] + [[package]] name = "markupsafe" version = "2.1.5" @@ -811,6 +835,17 @@ files = [ [package.dependencies] traitlets = "*" +[[package]] +name = "mdurl" +version = "0.1.2" +description = "Markdown URL utilities" +optional = false +python-versions = ">=3.7" +files = [ + {file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"}, + {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, +] + [[package]] name = "mypy-extensions" version = "1.0.0" @@ -1556,6 +1591,24 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "rich" +version = "13.9.4" +description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "rich-13.9.4-py3-none-any.whl", hash = "sha256:6049d5e6ec054bf2779ab3358186963bac2ea89175919d699e378b99738c2a90"}, + {file = "rich-13.9.4.tar.gz", hash = "sha256:439594978a49a09530cff7ebc4b5c7103ef57baf48d5ea3184f21d9a2befa098"}, +] + +[package.dependencies] +markdown-it-py = ">=2.2.0" +pygments = ">=2.13.0,<3.0.0" + +[package.extras] +jupyter = ["ipywidgets (>=7.5.1,<9)"] + [[package]] name = "ruff" version = "0.5.0" @@ -1721,4 +1774,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "ecb7393f4d4a81dd1b35c734a4384c8b96093ebbaf707b9f9ba22e08f9d93e3c" +content-hash = "398218abd7c2a6a86b136595f889e018e6a2f7630300820d9119114f48d47628" diff --git a/pydantic_schemas/metadata_manager.py b/pydantic_schemas/metadata_manager.py index ac1d23f..e96afa2 100644 --- a/pydantic_schemas/metadata_manager.py +++ b/pydantic_schemas/metadata_manager.py @@ -46,6 +46,8 @@ class MetadataManager: "video": video_schema.Model, } + _SCHEMA_TO_TYPE = {v: k for k, v in _TYPE_TO_SCHEMA.items()} + _TYPE_TO_WRITER = { "document": write_across_many_sheets, "geospatial": write_across_many_sheets, @@ -73,6 +75,24 @@ class MetadataManager: } def metadata_class_from_name(self, metadata_name: str) -> Type[BaseModel]: + """ + Retrieve the pydantic model class for the given metadata type. + + Args: + metadata_name (str): The name of the metadata type. Must be one of: + document, geospatial, image, indicator, indicators_db, microdata, resource, script, table, video + + Returns: + Type[BaseModel]: The pydantic model class for the metadata type. + + Raises: + ValueError: If the metadata name is not supported. + + Example: + >>> from pydantic_schemas.metadata_manager import MetadataManager + >>> manager = MetadataManager() + >>> document_class = manager.metadata_class_from_name("document") + """ metadata_name = self.standardize_metadata_name(metadata_name) schema = self._TYPE_TO_SCHEMA[metadata_name] return copy(schema) @@ -82,6 +102,26 @@ def metadata_type_names(self) -> List[str]: return list(self._TYPE_TO_SCHEMA.keys()) def standardize_metadata_name(self, metadata_name: str) -> str: + """ + Standardize the metadata name to a consistent format. In particular, it converts the name to lowercase and + replaces spaces and hyphens with underscores. It also maps certain metadata names to their standard + counterparts. For example, "survey" and "survey_microdata" are both mapped to "microdata". "timeseries" is + mapped to "indicator" and "timeseries_db" is mapped to "indicators_db". + + Args: + metadata_name (str): The name of the metadata type. + + Returns: + str: The standardized metadata name. + + Raises: + ValueError: If the metadata name is not supported. + + Example: + >>> from pydantic_schemas.metadata_manager import MetadataManager + >>> manager = MetadataManager() + >>> standardized_name = manager.standardize_metadata_name("Document") + """ metadata_name = metadata_name.lower() metadata_name = metadata_name.replace("-", "_").replace(" ", "_") if metadata_name == "survey" or metadata_name == "survey_microdata": @@ -96,6 +136,21 @@ def standardize_metadata_name(self, metadata_name: str) -> str: def create_metadata_outline( self, metadata_name_or_class: Union[str, Type[BaseModel]], debug: bool = False ) -> BaseModel: + """ + Create a skeleton pydantic object for the given metadata type. + + Args: + metadata_name_or_class (str or type[BaseModel]): The name of the metadata type or the metadata class. + debug (bool): If True, print debug information on the skeleton creation. + + Returns: + BaseModel: A pydantic object with the metadata schema and default values. + + Example: + >>> from pydantic_schemas.metadata_manager import MetadataManager + >>> manager = MetadataManager() + >>> document_skeleton = manager.create_metadata_outline("document") + """ if isinstance(metadata_name_or_class, str): schema = self.metadata_class_from_name(metadata_name_or_class) else: @@ -123,13 +178,17 @@ def _get_name_version_schema_writer(self, metadata_name_or_class): mappings. Otherwise, it assumes this is a template and retrieves the title from the class, and uses a default single page writer function. """ - if isinstance(metadata_name_or_class, str) or metadata_name_or_class in self._TYPE_TO_SCHEMA.values(): + if ( + isinstance(metadata_name_or_class, str) + or metadata_name_or_class in self._TYPE_TO_SCHEMA.values() + or type(metadata_name_or_class) in self._TYPE_TO_SCHEMA.values() + ): if isinstance(metadata_name_or_class, str): metadata_name = self.standardize_metadata_name(metadata_name_or_class) schema = self._TYPE_TO_SCHEMA[metadata_name] else: for metadata_name, schema in self._TYPE_TO_SCHEMA.items(): - if schema is metadata_name_or_class: + if schema is metadata_name_or_class or schema is type(metadata_name_or_class): break version = f"{metadata_name} type metadata version {__version__}" writer = self._TYPE_TO_WRITER[metadata_name] @@ -145,6 +204,7 @@ def write_metadata_outline_to_excel( metadata_name_or_class: Union[str, Type[BaseModel]], filename: Optional[str] = None, title: Optional[str] = None, + metadata_type: Optional[str] = None, ) -> str: """ Create an Excel file formatted for writing the given metadata_name metadata. @@ -153,9 +213,13 @@ def write_metadata_outline_to_excel( metadata_name_or_class (str or type[BaseModel]): the name of a supported metadata type, currently: document, geospatial, image, indicator, indicators_db, microdata, resource, script, table, video If passed as a BaseModel type, for instance this is what you would do with a template, then the writer - defaults to a single page. + is determined from the metadata_type. If the metadata_type is not provided, then the + writer defaults to write_to_single_sheet. filename (Optional[str]): The path to the Excel file. If None, defaults to {metadata_name}_metadata.xlsx title (Optional[str]): The title for the Excel sheet. If None, defaults to '{metadata_name} Metadata' + metadata_type (Optional[str]): The name of the metadata type, used if the metadata_name_or_class is + an instance of a template. For example 'geospatial', 'document' etc. The name is used to determine + the number of sheets in the Excel file. Returns: str: filename of metadata file @@ -163,7 +227,18 @@ def write_metadata_outline_to_excel( Outputs: An Excel file into which metadata can be entered """ - metadata_name, version, schema, writer = self._get_name_version_schema_writer(metadata_name_or_class) + # determine the metadata_name_or_class is a class instance or the actual class + if ( + metadata_type is not None + and not isinstance(metadata_name_or_class, str) + and metadata_name_or_class not in self._TYPE_TO_SCHEMA.values() + and type(metadata_name_or_class) not in self._TYPE_TO_SCHEMA.values() + ): + metadata_type = self.standardize_metadata_name(metadata_type) + _, _, _, writer = self._get_name_version_schema_writer(metadata_type) + metadata_name, version, schema, _ = self._get_name_version_schema_writer(metadata_name_or_class) + else: + metadata_name, version, schema, writer = self._get_name_version_schema_writer(metadata_name_or_class) skeleton_object = self.create_metadata_outline(schema, debug=False) if filename is None: @@ -181,6 +256,7 @@ def save_metadata_to_excel( object: BaseModel, filename: Optional[str] = None, title: Optional[str] = None, + metadata_type: Optional[str] = None, verbose: bool = False, ) -> str: """ @@ -190,6 +266,10 @@ def save_metadata_to_excel( object (BaseModel): The pydantic object to save to the Excel file. filename (Optional[str]): The path to the Excel file. Defaults to {name}_metadata.xlsx title (Optional[str]): The title for the Excel sheet. Defaults to '{name} Metadata' + metadata_type (Optional[str]): The name of the metadata type such as 'geospatial', 'document', etc. Used if + the metadata_name_or_class is an instance of a template. The name is used to determine the number of sheets + in the Excel file. + verbose (bool): If True, print debug information on the file creation. Returns: str: filename of metadata file @@ -197,9 +277,19 @@ def save_metadata_to_excel( Outputs: An Excel file containing the metadata from the pydantic object. This file can be updated as needed. """ - metadata_name, version, schema, writer = self._get_name_version_schema_writer( - type(object) - ) # metadata_name_or_class) + if ( + metadata_type is not None + # and object not in self._TYPE_TO_SCHEMA.values() + and type(object) not in self._TYPE_TO_SCHEMA.values() + ): + metadata_type = self.standardize_metadata_name(metadata_type) + _, _, _, writer = self._get_name_version_schema_writer(metadata_type) + metadata_name, version, schema, _ = self._get_name_version_schema_writer(type(object)) + else: + metadata_name, version, schema, writer = self._get_name_version_schema_writer(type(object)) + # metadata_name, version, schema, writer = self._get_name_version_schema_writer( + # type(object) + # ) # metadata_name_or_class) skeleton_object = self.create_metadata_outline(metadata_name_or_class=schema, debug=False) if filename is None: @@ -212,6 +302,7 @@ def save_metadata_to_excel( combined_dict = merge_dicts( skeleton_object.model_dump(), object.model_dump(exclude_none=False, exclude_unset=True, exclude_defaults=True), + skeleton_mode=True, ) combined_dict = standardize_keys_in_dict(combined_dict) new_ob = schema.model_validate(combined_dict) @@ -249,7 +340,11 @@ def _get_metadata_name_from_excel_file(filename: str) -> str: return cell_values[0] def read_metadata_from_excel( - self, filename: str, metadata_class: Optional[Type[BaseModel]] = None, verbose: bool = False + self, + filename: str, + metadata_class: Optional[Type[BaseModel]] = None, + metadata_type: Optional[str] = None, + verbose: bool = False, ) -> BaseModel: """ Read in metadata from an appropriately formatted Excel file as a pydantic object. @@ -258,9 +353,22 @@ def read_metadata_from_excel( Args: filename (str): The path to the Excel file. metadata_class (Optional type of BaseModel): A pydantic class type correspondong to the type used to write the Excel file + metadata_type (Optional[str]): The name of the metadata type, such as 'geospatial', 'document', etc. Used if + the metadata_name_or_class is an instance of a template. The name is used to determine the number of + sheets in the Excel file. + verbose (bool): If True, print debug information on the file reading. + Returns: BaseModel: a pydantic object containing the metadata from the file + + Raises: + ValueError: If the metadata type is not supported or if the Excel file is improperly formatted + + Example: + >>> from pydantic_schemas.metadata_manager import MetadataManager + >>> manager = MetadataManager() + >>> document_metadata = manager.read_metadata_from_excel("document_metadata.xlsx") """ metadata_name = self._get_metadata_name_from_excel_file(filename) try: @@ -273,15 +381,27 @@ def read_metadata_from_excel( f"'{metadata_name}' not supported. Must be: {list(self._TYPE_TO_SCHEMA.keys())} or try passing in the metadata_class" ) from e schema = metadata_class - reader = excel_single_sheet_to_pydantic + if metadata_type is not None: + metadata_type = self.standardize_metadata_name(metadata_type) + reader = self._TYPE_TO_READER[metadata_type] + else: + reader = excel_single_sheet_to_pydantic + if verbose: + print("reader is falling back to excel_single_sheet_to_pydantic") read_object = reader(filename, schema, verbose=verbose) - skeleton_object = self.create_metadata_outline(metadata_name_or_class=schema, debug=False) + skeleton_object = self.create_metadata_outline(metadata_name_or_class=schema, debug=verbose) + + read_object_dict = read_object.model_dump( + mode="json", exclude_none=False, exclude_unset=True, exclude_defaults=True + ) + if verbose: + print("read object dict", read_object_dict) - read_object_dict = read_object.model_dump(exclude_none=False, exclude_unset=True, exclude_defaults=True) combined_dict = merge_dicts( - skeleton_object.model_dump(), + skeleton_object.model_dump(mode="json"), read_object_dict, + skeleton_mode=True, ) combined_dict = standardize_keys_in_dict(combined_dict) new_ob = schema.model_validate(combined_dict) diff --git a/pydantic_schemas/tests/test_metadata_manager.py b/pydantic_schemas/tests/test_metadata_manager.py index f9a6797..5b8a735 100644 --- a/pydantic_schemas/tests/test_metadata_manager.py +++ b/pydantic_schemas/tests/test_metadata_manager.py @@ -1,145 +1,13 @@ -import random -import string from copy import copy from typing import List, Optional import pytest from pydantic import BaseModel, ValidationError -from utils.quick_start import make_skeleton +from utils.test_utils import assert_pydantic_models_equal, fill_in_pydantic_outline from pydantic_schemas.metadata_manager import MetadataManager -# Function to generate a random 4-character string -def random_string(length=4): - return "".join(random.choices(string.ascii_letters, k=length)) - - -# Recursive function to traverse and replace Nones or empty strings -def replace_nones_with_random(model: BaseModel): - assert isinstance(model, BaseModel), model - for field_name, field_value in model.__dict__.items(): - # If the field is None or an empty string, replace it with a random string - if field_value is None or field_value == "": - try: - show = field_value is not None or random.random() < 0.7 - setattr(model, field_name, random_string() if show else None) - except ValidationError: - continue - # If the field is another Pydantic model, recursively apply the function - elif isinstance(field_value, BaseModel): - replace_nones_with_random(field_value) - # If the field is a list of models, apply the function to each item - elif isinstance(field_value, list): - n_elements = random.choices([1, 4, 8])[0] - non_null_values = [random.random() < 0.7 for _ in range(n_elements)] - if not any(non_null_values): - continue - elif len(field_value) == 0: - try: - setattr( - model, field_name, [random_string() if non_null_values[i] else None for i in range(n_elements)] - ) - except ValidationError: - continue - elif isinstance(field_value[0], BaseModel): - try: - new_vals = [copy(field_value[0]) for i in range(n_elements)] - for v in new_vals: - replace_nones_with_random(v) - setattr( - model, - field_name, - new_vals, - ) - except ValidationError as e: - raise ValueError(f"{field_name}, {new_vals}") from e - # continue - else: - continue - # for item in field_value: - # if isinstance(item, BaseModel): - # replace_nones_with_random(item) - # If the field is a dict, apply the function to each value - elif isinstance(field_value, dict): - for key, item in field_value.items(): - if isinstance(item, BaseModel): - replace_nones_with_random(item) - - -def is_empty(m): - if isinstance(m, BaseModel): - iterabl = [v for _, v in m.model_dump().items()] - elif isinstance(m, dict): - if len(m) == 0: - return True - iterabl = [v for _, v in m.items()] - elif isinstance(m, list): - if len(m) == 0: - return True - iterabl = m - else: - return m is None - - for v in iterabl: - if isinstance(v, dict) or isinstance(v, BaseModel) or isinstance(v, list): - if is_empty(v) == False: - return False - elif v is not None: - return False - return True - - -# Recursive function to compare two Pydantic models -def compare_pydantic_models(model1: BaseModel, model2: BaseModel) -> bool: - # First, check if the two models are of the same type - if type(model1) is not type(model2): - assert False - - if not hasattr(model1, "model_fields"): - assert model1 == model2 - - # Traverse through the fields of the model - for field_name in model1.model_fields: - value1 = getattr(model1, field_name) - value2 = getattr(model2, field_name) - - # If values are different, return False - if value1 != value2: - # If both are BaseModel instances, compare recursively - if isinstance(value1, BaseModel) and isinstance(value2, BaseModel): - if not compare_pydantic_models(value1, value2): - assert False, field_name - # If both are lists, compare their elements - elif isinstance(value1, list) and isinstance(value2, list): - value1 = [v for v in value1 if is_empty(v) == False] - value2 = [v for v in value2 if is_empty(v) == False] - # remove empty basemodels - - assert len(value1) == len(value2) - for v1, v2 in zip(value1, value2): - if isinstance(v1, BaseModel) and isinstance(v2, BaseModel): - if not compare_pydantic_models(v1, v2): - assert False, field_name - elif v1 != v2: - assert False, field_name - elif isinstance(value1, list) and value2 is None: - continue - # If both are dicts, compare their items - elif isinstance(value1, dict) and isinstance(value2, dict): - assert value1.keys() == value2.keys() - for key in value1: - if isinstance(value1[key], BaseModel) and isinstance(value2[key], BaseModel): - if not compare_pydantic_models(value1[key], value2[key]): - assert False, field_name - else: - assert value1[key] == value2[key], field_name - else: - assert value1 == value2, field_name # For other types, if they are not equal, return False - - return True # All fields are equal - - @pytest.mark.parametrize( "metadata_name", ["document", "script", "microdata", "table", "indicators_db", "indicator", "video", "geospatial", "image"], @@ -167,7 +35,7 @@ def test_metadata_by_name(tmpdir, metadata_name): for i in range(10): modl = mm.create_metadata_outline(metadata_name_or_class=metadata_name) - replace_nones_with_random(modl) + fill_in_pydantic_outline(modl) # Write filled in metadata filename3 = tmpdir.join(f"test_{metadata_name}_{i}.xlsx") @@ -176,7 +44,7 @@ def test_metadata_by_name(tmpdir, metadata_name): # Read the metadata back actual = mm.read_metadata_from_excel(filename=filename3) - compare_pydantic_models(modl, actual) + assert_pydantic_models_equal(modl, actual) # assert modl == actual, actual @@ -190,7 +58,7 @@ def test_metadata_by_class(tmpdir, metadata_name): metadata_class = mm.metadata_class_from_name(metadata_name=metadata_name) # outline from class - mm.create_metadata_outline(metadata_name_or_class=metadata_class) + outline = mm.create_metadata_outline(metadata_name_or_class=metadata_class) # write and read from class filename_class = mm.write_metadata_outline_to_excel( diff --git a/pydantic_schemas/utils/excel_to_pydantic.py b/pydantic_schemas/utils/excel_to_pydantic.py index 775ee00..ebe6ba5 100644 --- a/pydantic_schemas/utils/excel_to_pydantic.py +++ b/pydantic_schemas/utils/excel_to_pydantic.py @@ -206,7 +206,22 @@ def handle_list_within_list(name, anno, df, debug=False): print(f"values: {values}, {type(values)}") if values is None: return [] - values = json.loads(values.replace("'", '"').replace("None", "null")) + try: + values = json.loads(values.replace("'", '"').replace("None", "null")) + except json.JSONDecodeError as e: + try: + values = json.loads(values.replace("None", "null")) + except json.JSONDecodeError as e: + try: + appostrophe_string = "__APOSTROPHE__" + values = json.loads( + values.replace("'", appostrophe_string) + .replace("'", '"') + .replace(appostrophe_string, "'") + .replace("None", "null") + ) + except json.JSONDecodeError as e: + raise ValueError(f"cannot decode {name}:{anno} with values {values}") from e if debug: print(f"decoded values:", values) if len(values) == 0: @@ -330,11 +345,11 @@ def excel_sheet_to_pydantic( print(f"excel_sheet_to_pydantic, sheetname={sheetname}, model_type={model_type}") df = pd.read_excel(filename, sheet_name=sheetname, header=None) df = df.where(df.notnull(), None) - if sheetname != "metadata": - try: - df = get_relevant_sub_frame(model_type, df, debug=debug) - except (KeyError, IndexError): - pass + # if sheetname != "metadata" and sheetname != "additional": + # try: + # df = get_relevant_sub_frame(model_type, df, debug=debug) + # except (KeyError, IndexError): + # pass if debug: print("line 304", model_type) print(df) @@ -355,7 +370,13 @@ def excel_sheet_to_pydantic( print(f"children: {children}") ret = {} if "simple" in children and len(children["simple"]): - sub = get_relevant_sub_frame(model_type, df, name_of_field=df.iloc[0, 0]) + if set(children["simple"]) != set(df.iloc[:, 0].values): + if debug: + print(f"simple children: {set(children['simple'])}") + print(f"df columns: {set(df.iloc[:, 0].values)}") + sub = get_relevant_sub_frame(model_type, df, name_of_field=df.iloc[0, 0]) + else: + sub = df simple_child_field_type = subset_pydantic_model_type(model_type, children["simple"]) fields = instantiate_pydantic_object(simple_child_field_type, sub, from_within_list=False, debug=debug) for child in children["simple"]: diff --git a/pydantic_schemas/utils/schema_base_model.py b/pydantic_schemas/utils/schema_base_model.py index a5e67f7..c52732b 100644 --- a/pydantic_schemas/utils/schema_base_model.py +++ b/pydantic_schemas/utils/schema_base_model.py @@ -1,7 +1,19 @@ from pydantic import BaseModel, ConfigDict +from rich import print as print_rich + +# from rich.pretty import pretty_repr class SchemaBaseModel(BaseModel): model_config = ConfigDict( validate_assignment=True, protected_namespaces=(), use_enum_values=True, extra="ignore" ) # if a subclass has a model_config then this will be overridden + + def pretty_print(self): + print_rich(self) + + # def __repr__(self): + # return pretty_repr(self) + + # def __str__(self): + # return pretty_repr(self) diff --git a/pydantic_schemas/utils/test_utils.py b/pydantic_schemas/utils/test_utils.py new file mode 100644 index 0000000..8a9a660 --- /dev/null +++ b/pydantic_schemas/utils/test_utils.py @@ -0,0 +1,192 @@ +import random +import string + +from pydantic import BaseModel, ValidationError +from pydantic_core import Url + + +# Function to generate a random 4-character string +def random_string(length=4): + return "".join(random.choices(string.ascii_letters, k=length)) + + +# Recursive function to traverse and replace Nones or empty strings +def fill_in_pydantic_outline(model: BaseModel, debug=False): + assert isinstance(model, BaseModel), model + for field_name, field_value in model.__dict__.items(): + if debug: + print(f"filling in {field_name} starting with {field_value}") + if field_value is None or field_value == "": + try: + show = field_value is not None or random.random() < 0.7 + setattr(model, field_name, random_string() if show else None) + if debug: + print(f"filled in {field_name} with {getattr(model, field_name)}") + except ValidationError: + continue + elif isinstance(field_value, BaseModel): + fill_in_pydantic_outline(field_value) + elif isinstance(field_value, dict): + for key, item in field_value.items(): + if isinstance(item, BaseModel): + fill_in_pydantic_outline(item) + elif isinstance(field_value, list): + if debug: + print("found list") + n_elements = random.choices([1, 4, 8])[0] + + if len(field_value) == 0: + non_null_values = [random.random() < 0.7 for _ in range(n_elements)] + if not any(non_null_values): + setattr( + model, + field_name, + [], + ) + try: + setattr( + model, field_name, [random_string() if non_null_values[i] else None for i in range(n_elements)] + ) + except ValidationError: + setattr( + model, + field_name, + [], + ) + elif isinstance(field_value[0], BaseModel): + if debug: + print("found list of basemodels") + try: + # skeleton = make_skeleton(type(field_value[0])) + # make a deep copy of the skeleton pydantic object + new_vals = [field_value[0].model_copy(deep=True) for i in range(n_elements)] + if debug: + print(f"new_vals: {new_vals}") + for i in range(n_elements): + fill_in_pydantic_outline(new_vals[i]) + if debug: + print(f"new_vals filled: {new_vals}") + # ignore list item if every value in the item is None or default + new_vals = [v for v in new_vals if is_empty(v) == False] + if debug: + print(f"new_vals filtered: {new_vals}") + setattr( + model, + field_name, + new_vals, + ) + if len(new_vals) == 0: + assert ( + getattr(model, field_name) == [] + ), f"{field_name}, {new_vals}, {getattr(model, field_name)}" + assert getattr(model, field_name) != [[]], f"{field_name}, {new_vals}, {getattr(model, field_name)}" + except ValidationError as e: + raise ValueError(f"{field_name}, {new_vals}") from e + else: + raise NotImplementedError( + f"fill_in_pydantic_outline list type not implemented for {field_name}: {field_value}" + ) + elif isinstance(field_value, Url): + continue + else: + raise NotImplementedError( + f"fill_in_pydantic_outline not implemented for {field_name}: {field_value} of type {type(field_value)} from {model}" + ) + + +def is_empty(m): + if isinstance(m, str): + return m == "" + elif isinstance(m, BaseModel): + iterabl = [v for _, v in m.model_dump().items()] + elif isinstance(m, dict): + if len(m) == 0: + return True + iterabl = [v for _, v in m.items()] + elif isinstance(m, list): + if len(m) == 0: + return True + iterabl = m + else: + return m is None + + for v in iterabl: + if isinstance(v, dict) or isinstance(v, BaseModel) or isinstance(v, list) or isinstance(v, str): + if is_empty(v) == False: + return False + elif v is not None: + return False + return True + + +# Recursive function to compare two Pydantic models +def assert_pydantic_models_equal(model1: BaseModel, model2: BaseModel) -> bool: + # First, check if the two models are of the same type + if type(model1) is not type(model2): + assert False, f"mismatched types {type(model1)}, {type(model2)}" + + if not hasattr(model1, "model_fields"): + assert model1 == model2, f"{model1}, {model2}" + + # Traverse through the fields of the model + for field_name in model1.model_fields: + value1 = getattr(model1, field_name) + value2 = getattr(model2, field_name) + + if value1 is None and value2 is None: + continue + + # distinction without a difference + if value1 is None and value2 == "": + continue + if value1 == "" and value2 is None: + continue + + # If values are different, return False + if value1 != value2: + if isinstance(value1, str) and isinstance(value2, str): + # sometimes new line is \r\n and sometimes \n but this is not a real difference + normalize_newlines = lambda s: "\n".join(s.splitlines()) + assert normalize_newlines(value1) == normalize_newlines(value2), field_name + # If both are BaseModel instances, compare recursively + elif isinstance(value1, BaseModel) and isinstance(value2, BaseModel): + assert_pydantic_models_equal(value1, value2) + # assert False, field_name + # If both are lists, compare their elements + elif isinstance(value1, list) or isinstance(value2, list): + if value1 is None: + value1 = [] + else: + value1 = [v for v in value1 if is_empty(v) == False] + if value2 is None: + value2 = [] + else: + value2 = [v for v in value2 if is_empty(v) == False] + # remove empty basemodels + + assert len(value1) == len(value2), f"{field_name} mismatched len, {value1}, {value2}" + for v1, v2 in zip(value1, value2): + if isinstance(v1, BaseModel) and isinstance(v2, BaseModel): + assert_pydantic_models_equal(v1, v2) + else: + assert v1 == v2, field_name + # if not compare_pydantic_models(v1, v2): + # assert False, field_name + # elif v1 != v2: + # assert False, field_name + # elif isinstance(value1, list) and value2 is None: + # continue + # If both are dicts, compare their items + elif isinstance(value1, dict) and isinstance(value2, dict): + assert value1.keys() == value2.keys(), f"{field_name} mismatched keys, {value1.keys()}, {value2.keys()}" + for key in value1: + if isinstance(value1[key], BaseModel) and isinstance(value2[key], BaseModel): + assert_pydantic_models_equal(value1[key], value2[key]) + # if not compare_pydantic_models(value1[key], value2[key]): + # assert False, field_name + else: + assert value1[key] == value2[key], field_name + else: + assert value1 == value2, field_name # For other types, if they are not equal, return False + + return True # All fields are equal diff --git a/pydantic_schemas/utils/utils.py b/pydantic_schemas/utils/utils.py index 579bcf6..69ed8e4 100644 --- a/pydantic_schemas/utils/utils.py +++ b/pydantic_schemas/utils/utils.py @@ -1,3 +1,4 @@ +import copy import re import typing from typing import Any, Callable, Dict, List, Optional, Type, Union @@ -119,9 +120,15 @@ def seperate_simple_from_pydantic(ob: BaseModel) -> Dict[str, Dict]: return {"simple": simple_children, "pydantic": pydantic_children} -def merge_dicts(base, update): +def merge_dicts(base, update, skeleton_mode=False): """merge a pair of dicitonaries in which the values are themselves either dictionaries to be merged or lists of - dictionaries to be merged""" + dictionaries to be merged. + + If skeleton_mode is True, then the base dictionary is assumed to be a skeleton where all lists of dictionaries have + only one skeleton element. So then the skeleton element is duplicated and merged with each of the elements of the + update elements. + + """ if len(update) == 0: return base elif len(base) == 0: @@ -138,16 +145,33 @@ def merge_dicts(base, update): elif isinstance(base_value, list): if isinstance(update_value, list) and len(update_value) > 0: new_list = [] - min_length = min(len(base_value), len(update_value)) - for i in range(min_length): - if isinstance(base_value[i], dict): - if isinstance(update_value[i], dict): - new_list.append(merge_dicts(base_value[i], update_value[i])) + if not skeleton_mode: + min_length = min(len(base_value), len(update_value)) + for i in range(min_length): + if isinstance(base_value[i], dict): + if isinstance(update_value[i], dict): + new_list.append(merge_dicts(base_value[i], update_value[i])) + else: + new_list.append(base_value[i]) else: - new_list.append(base_value[i]) - else: - new_list.append(update_value[i]) - new_list.extend(update_value[min_length:]) + new_list.append(update_value[i]) + if len(base_value) > len(update_value): + new_list.extend(base_value[min_length:]) + elif len(update_value) > len(base_value): + new_list.extend(update_value[min_length:]) + else: + for i in range(len(update_value)): + skeleton = copy.deepcopy(base_value[0]) + if isinstance(skeleton, dict): + if isinstance(update_value[i], dict): + new_list.append(merge_dicts(skeleton, update_value[i])) + else: + new_list.append(skeleton) + else: + raise ValueError( + f"skeleton mode only works when passed base dictionaries: base_value = {base_value}, update_value = {update_value}" + ) + new_dict[key] = new_list else: new_dict[key] = base_value diff --git a/pyproject.toml b/pyproject.toml index 29b6516..5cd2cbc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ numpy = "^2.1.0" pydantic = "^2.8.0" openpyxl = "^3.1.5" certifi = "^2024.8.30" +rich = "^13.9.4" [tool.poetry.group.dev.dependencies] pytest = "^8.2.2"