Skip to content

Commit

Permalink
libcovebods: More formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed (ODSC) committed Sep 22, 2024
1 parent d6e80ff commit cbaa571
Show file tree
Hide file tree
Showing 16 changed files with 923 additions and 450 deletions.
8 changes: 5 additions & 3 deletions libcovebods/jsonschemavalidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ def validate(self, data_reader: libcovebods.data_reader.DataReader) -> list:
# Make the validator
statement_schema = registry.contents("urn:statement")
validator = Draft202012Validator(
schema=statement_schema, registry=registry, format_checker=FormatChecker()
schema=statement_schema,
registry=registry,
format_checker=FormatChecker(),
)
else:
validator = Draft4Validator(
Expand Down Expand Up @@ -151,7 +153,7 @@ def __init__(
def json(self):
"""Return representation of this error in JSON."""

#print(self._path, self._message)
# print(self._path, self._message)
if self._path:
path_ending = self._path[-1]
if isinstance(self._path[-1], int) and len(self._path) >= 2:
Expand All @@ -160,7 +162,7 @@ def json(self):
elif isinstance(self._path[0], int) and len(self._path) == 1:
path_ending = "[number]"
else:
path_ending = '$'
path_ending = "$"

return {
"message": self._message,
Expand Down
2 changes: 1 addition & 1 deletion libcovebods/run_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def process_additional_checks(

# Second Pass
for statement in all_data:
#statement_type = statement.get("statementType")
# statement_type = statement.get("statementType")
statement_type = get_statement_type(statement, schema_object)
for additional_check_instance in additional_check_instances:
additional_check_instance.check_statement_second_pass(statement)
Expand Down
63 changes: 45 additions & 18 deletions libcovebods/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
except ImportError:
from cached_property import cached_property # type: ignore


def record_based_statement(statement):
if ("recordDetails" in statement or "recordId" in statement or
"recordType" in statement):
if (
"recordDetails" in statement
or "recordId" in statement
or "recordType" in statement
):
return True
else:
return False


class SchemaBODS:
def __init__(
self,
Expand Down Expand Up @@ -110,7 +115,9 @@ def __work_out_schema_version(
}
# Use latest non-record version if not record based else latest version (revisit)
if not record_based_statement(statement):
self.schema_version = self.config.config["schema_latest_nonrecord_version"]
self.schema_version = self.config.config[
"schema_latest_nonrecord_version"
]
self.pkg_schema_url = self.config.config["schema_versions"][
self.schema_version
]["schema_url"]
Expand All @@ -136,7 +143,9 @@ def __work_out_schema_version(
}
# Use latest non-record version if not record based else latest version (revisit)
if not record_based_statement(statement):
self.schema_version = self.config.config["schema_latest_nonrecord_version"]
self.schema_version = self.config.config[
"schema_latest_nonrecord_version"
]
self.pkg_schema_url = self.config.config["schema_versions"][
self.schema_version
]["schema_url"]
Expand Down Expand Up @@ -171,8 +180,10 @@ def get_entity_statement_types_list(self):
):
return statement_schema["properties"]["entityType"]["enum"]
else:
entity_schema = get_scheme_file_data(self.pkg_schema_url, 'entity')
return entity_schema['properties']['entityType']['properties']['type']['enum']
entity_schema = get_scheme_file_data(self.pkg_schema_url, "entity")
return entity_schema["properties"]["entityType"]["properties"]["type"][
"enum"
]

def get_person_statement_types_list(self):
if self.is_schema_version_equal_to_or_less_than("0.3"):
Expand All @@ -183,8 +194,8 @@ def get_person_statement_types_list(self):
):
return statement_schema["properties"]["personType"]["enum"]
else:
person_schema = get_scheme_file_data(self.pkg_schema_url, 'person')
return person_schema['properties']['personType']['enum']
person_schema = get_scheme_file_data(self.pkg_schema_url, "person")
return person_schema["properties"]["personType"]["enum"]

def get_ownership_or_control_statement_interest_statement_types_list(self):
if self.is_schema_version_equal_to_or_less_than("0.3"):
Expand All @@ -193,10 +204,16 @@ def get_ownership_or_control_statement_interest_statement_types_list(self):
statement_schema["properties"]["statementType"]["enum"][0]
== "ownershipOrControlStatement"
):
return statement_schema["properties"]["interests"]["items"]["properties"]["type"]["enum"]
return statement_schema["properties"]["interests"]["items"][
"properties"
]["type"]["enum"]
else:
relationship_schema = get_scheme_file_data(self.pkg_schema_url, 'relationship')
return relationship_schema["$defs"]["Interest"]["properties"]["type"]["enum"]
relationship_schema = get_scheme_file_data(
self.pkg_schema_url, "relationship"
)
return relationship_schema["$defs"]["Interest"]["properties"]["type"][
"enum"
]

def get_ownership_or_control_statement_interest_direct_or_indirect_list(self):
if self.is_schema_version_equal_to_or_less_than("0.3"):
Expand All @@ -207,15 +224,19 @@ def get_ownership_or_control_statement_interest_direct_or_indirect_list(self):
):
direct_or_indirect_json_schema = statement_schema["properties"][
"interests"
]["items"]["properties"].get("directOrIndirect")
]["items"]["properties"].get("directOrIndirect")
# This is only available in 0.3 and above.
if isinstance(direct_or_indirect_json_schema, dict):
return direct_or_indirect_json_schema.get("enum")
else:
return []
else:
relationship_schema = get_scheme_file_data(self.pkg_schema_url, 'relationship')
return relationship_schema["$defs"]["Interest"]["properties"]["directOrIndirect"]["enum"]
relationship_schema = get_scheme_file_data(
self.pkg_schema_url, "relationship"
)
return relationship_schema["$defs"]["Interest"]["properties"][
"directOrIndirect"
]["enum"]

def get_person_statement_political_exposure_status_list(self):
if self.is_schema_version_equal_to_or_less_than("0.3"):
Expand All @@ -233,8 +254,10 @@ def get_person_statement_political_exposure_status_list(self):
else:
return []
else:
person_schema = get_scheme_file_data(self.pkg_schema_url, 'person')
return person_schema['properties']["politicalExposure"]["properties"]["status"]["enum"]
person_schema = get_scheme_file_data(self.pkg_schema_url, "person")
return person_schema["properties"]["politicalExposure"]["properties"][
"status"
]["enum"]

def get_inconsistent_schema_version_used_for_statement(self, statement):
# If version is not set at all, then we assume it's the default version
Expand Down Expand Up @@ -279,8 +302,12 @@ def is_schema_version_less_than(self, version):
def get_package_schema_fields(self) -> set:
if self.is_schema_version_equal_to_or_greater_than("0.4"):
print("Start:")
return set(schema_dict_fields_generator(self._pkg_schema_obj.contents("urn:statement"),
registry=self._pkg_schema_obj))
return set(
schema_dict_fields_generator(
self._pkg_schema_obj.contents("urn:statement"),
registry=self._pkg_schema_obj,
)
)
else:
print("Start old:", self.schema_version)
return set(schema_dict_fields_generator(self._pkg_schema_obj))
Expand Down
10 changes: 8 additions & 2 deletions libcovebods/schema_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@ def get_schema_paths(schema_dir):
Returns an array of paths, filenames, and contents (parsed JSON) for each of the schema files.
"""
schema_paths = [
(path, name, data) for path, name, _, data in walk_json_data(top=schema_dir) if is_json_schema(data)
(path, name, data)
for path, name, _, data in walk_json_data(top=schema_dir)
if is_json_schema(data)
]
return schema_paths


def schema_registry(schema_dir):
"""
This loads the BODS schema files into a jsonschema registry, so the
validator can resolve $refs across all of the schema files.
"""
schemas = []
for _, _, schema in get_schema_paths(schema_dir):
schemas.append((schema.get("$id"), Resource(contents=schema, specification=DRAFT202012)))
schemas.append(
(schema.get("$id"), Resource(contents=schema, specification=DRAFT202012))
)

registry = Registry().with_resources(schemas)
return registry


def get_scheme_file_data(schema_dir, component):
for file_path in Path(schema_dir).glob("*.json"):
if file_path.name.startswith(component):
Expand Down
Loading

0 comments on commit cbaa571

Please sign in to comment.