diff --git a/libcovebods/jsonschemavalidate.py b/libcovebods/jsonschemavalidate.py index a664f68..b15080e 100644 --- a/libcovebods/jsonschemavalidate.py +++ b/libcovebods/jsonschemavalidate.py @@ -108,7 +108,9 @@ def validate(self, data_reader: libcovebods.data_reader.DataReader) -> list: # Make the validator statement_schema = registry.contents("urn:statement") validator = Draft202012Validator( - schema=statement_schema, registry=registry, format_checker=FormatChecker() + schema=statement_schema, + registry=registry, + format_checker=FormatChecker(), ) else: validator = Draft4Validator( @@ -151,7 +153,7 @@ def __init__( def json(self): """Return representation of this error in JSON.""" - #print(self._path, self._message) + # print(self._path, self._message) if self._path: path_ending = self._path[-1] if isinstance(self._path[-1], int) and len(self._path) >= 2: @@ -160,7 +162,7 @@ def json(self): elif isinstance(self._path[0], int) and len(self._path) == 1: path_ending = "[number]" else: - path_ending = '$' + path_ending = "$" return { "message": self._message, diff --git a/libcovebods/run_tasks.py b/libcovebods/run_tasks.py index f9b7cdc..6bd774e 100644 --- a/libcovebods/run_tasks.py +++ b/libcovebods/run_tasks.py @@ -117,7 +117,7 @@ def process_additional_checks( # Second Pass for statement in all_data: - #statement_type = statement.get("statementType") + # statement_type = statement.get("statementType") statement_type = get_statement_type(statement, schema_object) for additional_check_instance in additional_check_instances: additional_check_instance.check_statement_second_pass(statement) diff --git a/libcovebods/schema.py b/libcovebods/schema.py index 7754f56..779bbc5 100644 --- a/libcovebods/schema.py +++ b/libcovebods/schema.py @@ -14,13 +14,18 @@ except ImportError: from cached_property import cached_property # type: ignore + def record_based_statement(statement): - if ("recordDetails" in statement or "recordId" in statement or - "recordType" in statement): + if ( + "recordDetails" in statement + or "recordId" in statement + or "recordType" in statement + ): return True else: return False + class SchemaBODS: def __init__( self, @@ -110,7 +115,9 @@ def __work_out_schema_version( } # Use latest non-record version if not record based else latest version (revisit) if not record_based_statement(statement): - self.schema_version = self.config.config["schema_latest_nonrecord_version"] + self.schema_version = self.config.config[ + "schema_latest_nonrecord_version" + ] self.pkg_schema_url = self.config.config["schema_versions"][ self.schema_version ]["schema_url"] @@ -136,7 +143,9 @@ def __work_out_schema_version( } # Use latest non-record version if not record based else latest version (revisit) if not record_based_statement(statement): - self.schema_version = self.config.config["schema_latest_nonrecord_version"] + self.schema_version = self.config.config[ + "schema_latest_nonrecord_version" + ] self.pkg_schema_url = self.config.config["schema_versions"][ self.schema_version ]["schema_url"] @@ -171,8 +180,10 @@ def get_entity_statement_types_list(self): ): return statement_schema["properties"]["entityType"]["enum"] else: - entity_schema = get_scheme_file_data(self.pkg_schema_url, 'entity') - return entity_schema['properties']['entityType']['properties']['type']['enum'] + entity_schema = get_scheme_file_data(self.pkg_schema_url, "entity") + return entity_schema["properties"]["entityType"]["properties"]["type"][ + "enum" + ] def get_person_statement_types_list(self): if self.is_schema_version_equal_to_or_less_than("0.3"): @@ -183,8 +194,8 @@ def get_person_statement_types_list(self): ): return statement_schema["properties"]["personType"]["enum"] else: - person_schema = get_scheme_file_data(self.pkg_schema_url, 'person') - return person_schema['properties']['personType']['enum'] + person_schema = get_scheme_file_data(self.pkg_schema_url, "person") + return person_schema["properties"]["personType"]["enum"] def get_ownership_or_control_statement_interest_statement_types_list(self): if self.is_schema_version_equal_to_or_less_than("0.3"): @@ -193,10 +204,16 @@ def get_ownership_or_control_statement_interest_statement_types_list(self): statement_schema["properties"]["statementType"]["enum"][0] == "ownershipOrControlStatement" ): - return statement_schema["properties"]["interests"]["items"]["properties"]["type"]["enum"] + return statement_schema["properties"]["interests"]["items"][ + "properties" + ]["type"]["enum"] else: - relationship_schema = get_scheme_file_data(self.pkg_schema_url, 'relationship') - return relationship_schema["$defs"]["Interest"]["properties"]["type"]["enum"] + relationship_schema = get_scheme_file_data( + self.pkg_schema_url, "relationship" + ) + return relationship_schema["$defs"]["Interest"]["properties"]["type"][ + "enum" + ] def get_ownership_or_control_statement_interest_direct_or_indirect_list(self): if self.is_schema_version_equal_to_or_less_than("0.3"): @@ -207,15 +224,19 @@ def get_ownership_or_control_statement_interest_direct_or_indirect_list(self): ): direct_or_indirect_json_schema = statement_schema["properties"][ "interests" - ]["items"]["properties"].get("directOrIndirect") + ]["items"]["properties"].get("directOrIndirect") # This is only available in 0.3 and above. if isinstance(direct_or_indirect_json_schema, dict): return direct_or_indirect_json_schema.get("enum") else: return [] else: - relationship_schema = get_scheme_file_data(self.pkg_schema_url, 'relationship') - return relationship_schema["$defs"]["Interest"]["properties"]["directOrIndirect"]["enum"] + relationship_schema = get_scheme_file_data( + self.pkg_schema_url, "relationship" + ) + return relationship_schema["$defs"]["Interest"]["properties"][ + "directOrIndirect" + ]["enum"] def get_person_statement_political_exposure_status_list(self): if self.is_schema_version_equal_to_or_less_than("0.3"): @@ -233,8 +254,10 @@ def get_person_statement_political_exposure_status_list(self): else: return [] else: - person_schema = get_scheme_file_data(self.pkg_schema_url, 'person') - return person_schema['properties']["politicalExposure"]["properties"]["status"]["enum"] + person_schema = get_scheme_file_data(self.pkg_schema_url, "person") + return person_schema["properties"]["politicalExposure"]["properties"][ + "status" + ]["enum"] def get_inconsistent_schema_version_used_for_statement(self, statement): # If version is not set at all, then we assume it's the default version @@ -279,8 +302,12 @@ def is_schema_version_less_than(self, version): def get_package_schema_fields(self) -> set: if self.is_schema_version_equal_to_or_greater_than("0.4"): print("Start:") - return set(schema_dict_fields_generator(self._pkg_schema_obj.contents("urn:statement"), - registry=self._pkg_schema_obj)) + return set( + schema_dict_fields_generator( + self._pkg_schema_obj.contents("urn:statement"), + registry=self._pkg_schema_obj, + ) + ) else: print("Start old:", self.schema_version) return set(schema_dict_fields_generator(self._pkg_schema_obj)) diff --git a/libcovebods/schema_dir.py b/libcovebods/schema_dir.py index e17517a..430b3f7 100644 --- a/libcovebods/schema_dir.py +++ b/libcovebods/schema_dir.py @@ -12,10 +12,13 @@ def get_schema_paths(schema_dir): Returns an array of paths, filenames, and contents (parsed JSON) for each of the schema files. """ schema_paths = [ - (path, name, data) for path, name, _, data in walk_json_data(top=schema_dir) if is_json_schema(data) + (path, name, data) + for path, name, _, data in walk_json_data(top=schema_dir) + if is_json_schema(data) ] return schema_paths + def schema_registry(schema_dir): """ This loads the BODS schema files into a jsonschema registry, so the @@ -23,11 +26,14 @@ def schema_registry(schema_dir): """ schemas = [] for _, _, schema in get_schema_paths(schema_dir): - schemas.append((schema.get("$id"), Resource(contents=schema, specification=DRAFT202012))) + schemas.append( + (schema.get("$id"), Resource(contents=schema, specification=DRAFT202012)) + ) registry = Registry().with_resources(schemas) return registry + def get_scheme_file_data(schema_dir, component): for file_path in Path(schema_dir).glob("*.json"): if file_path.name.startswith(component): diff --git a/libcovebods/tasks/checks.py b/libcovebods/tasks/checks.py index 913e509..f6225d6 100644 --- a/libcovebods/tasks/checks.py +++ b/libcovebods/tasks/checks.py @@ -616,8 +616,9 @@ def _add_statement_ids_to_statement_ids_counted(self, statement_ids): class CheckHasPublicListing(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: - return (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and - schema_object.is_schema_version_less_than("0.4")) + return schema_object.is_schema_version_equal_to_or_greater_than( + "0.3" + ) and schema_object.is_schema_version_less_than("0.4") @staticmethod def get_additional_check_types_possible( @@ -664,11 +665,13 @@ def check_entity_statement_first_pass(self, statement): } ) + class CheckEntityTypeAndEntitySubtypeAlign(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: - return (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and - schema_object.is_schema_version_less_than("0.4")) + return schema_object.is_schema_version_equal_to_or_greater_than( + "0.3" + ) and schema_object.is_schema_version_less_than("0.4") @staticmethod def get_additional_check_types_possible( @@ -698,8 +701,9 @@ def __init__(self, lib_cove_bods_config, schema_object): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: - return (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and - schema_object.is_schema_version_less_than("0.4")) + return schema_object.is_schema_version_equal_to_or_greater_than( + "0.3" + ) and schema_object.is_schema_version_less_than("0.4") @staticmethod def get_additional_check_types_possible( @@ -710,8 +714,10 @@ def get_additional_check_types_possible( "entity_security_listing_market_identifier_code_set_but_not_operating_market_identifier_code", "entity_security_listing_operating_market_identifier_code_set_but_not_market_identifier_code", ] - if (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and - schema_object.is_schema_version_less_than("0.4")) + if ( + schema_object.is_schema_version_equal_to_or_greater_than("0.3") + and schema_object.is_schema_version_less_than("0.4") + ) else [] ) @@ -744,6 +750,7 @@ def check_entity_statement_first_pass(self, statement): } ) + class CheckEntitySecurityListingsMICSCodesRecord(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): super().__init__(lib_cove_bods_config, schema_object) @@ -767,12 +774,12 @@ def get_additional_check_types_possible( def check_entity_statement_first_pass(self, statement): record = statement.get("recordDetails") - if isinstance(record, dict) and isinstance(record.get("publicListing"), dict) and isinstance( - record["publicListing"].get("securitiesListings"), list + if ( + isinstance(record, dict) + and isinstance(record.get("publicListing"), dict) + and isinstance(record["publicListing"].get("securitiesListings"), list) ): - for securitiesListing in record["publicListing"].get( - "securitiesListings" - ): + for securitiesListing in record["publicListing"].get("securitiesListings"): if isinstance(securitiesListing, dict): marketIdentifierCode = securitiesListing.get("marketIdentifierCode") operatingMarketIdentifierCode = securitiesListing.get( @@ -795,6 +802,7 @@ def check_entity_statement_first_pass(self, statement): } ) + class CheckSourceRetrievedAtFutureDate(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): super().__init__(lib_cove_bods_config, schema_object) @@ -804,16 +812,22 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_statement_first_pass(self, statement): - if ("source" in statement and isinstance(statement["source"], dict) and - "retrievedAt" in statement["source"] and statement["source"]["retrievedAt"]): + if ( + "source" in statement + and isinstance(statement["source"], dict) + and "retrievedAt" in statement["source"] + and statement["source"]["retrievedAt"] + ): retrieved_at = parse_date_field(statement["source"]["retrievedAt"]) if retrieved_at and retrieved_at > datetime.now().date(): self._additional_check_results.append( - { - "type": "statement_source_retrieved_at_future_date", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_source_retrieved_at_future_date", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementDateFutureDate(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -824,15 +838,17 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_statement_first_pass(self, statement): - if ("statementDate" in statement and statement["statementDate"]): + if "statementDate" in statement and statement["statementDate"]: statement_date = parse_date_field(statement["statementDate"]) if statement_date and statement_date > datetime.now().date(): self._additional_check_results.append( - { - "type": "statement_date_is_future_date", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_date_is_future_date", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckAnnotationCreationDateFutureDate(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -846,8 +862,11 @@ def check_statement_first_pass(self, statement): if "annotations" in statement and isinstance(statement["annotations"], list): for annotation in statement["annotations"]: print(annotation) - if (isinstance(annotation, dict) and "creationDate" in annotation and - annotation["creationDate"]): + if ( + isinstance(annotation, dict) + and "creationDate" in annotation + and annotation["creationDate"] + ): creation_date = parse_date_field(annotation["creationDate"]) print(creation_date) if creation_date and creation_date > datetime.now().date(): @@ -856,7 +875,8 @@ def check_statement_first_pass(self, statement): "type": "statement_annotation_creation_date_is_future_date", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) class CheckStatementPublicationDateFutureDate(AdditionalCheck): @@ -868,17 +888,23 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_statement_first_pass(self, statement): - if ("publicationDetails" in statement and isinstance(statement["publicationDetails"], dict) - and "publicationDate" in statement["publicationDetails"] and - statement["publicationDetails"]["publicationDate"]): - publication_date = parse_date_field(statement["publicationDetails"]["publicationDate"]) + if ( + "publicationDetails" in statement + and isinstance(statement["publicationDetails"], dict) + and "publicationDate" in statement["publicationDetails"] + and statement["publicationDetails"]["publicationDate"] + ): + publication_date = parse_date_field( + statement["publicationDetails"]["publicationDate"] + ) if publication_date and publication_date > datetime.now().date(): self._additional_check_results.append( - { - "type": "statement_publication_date_is_future_date", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_publication_date_is_future_date", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) class CheckStatementPersonDateOfDeathSane(AdditionalCheck): @@ -891,30 +917,44 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: def check_person_statement_first_pass(self, statement): print(statement) - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) - and "deathDate" in statement["recordDetails"] and - statement["recordDetails"]["deathDate"]): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "deathDate" in statement["recordDetails"] + and statement["recordDetails"]["deathDate"] + ): print("Death date:", statement["recordDetails"]["deathDate"]) death_date = parse_date_field(statement["recordDetails"]["deathDate"]) if death_date: - if (death_date > datetime.now().date() or - death_date < datetime.strptime("1800-01-01", "%Y-%m-%d").date()): + if ( + death_date > datetime.now().date() + or death_date < datetime.strptime("1800-01-01", "%Y-%m-%d").date() + ): self._additional_check_results.append( - { - "type": "statement_person_death_date_not_sensible_value", - "statement_type": None, - "statement": statement.get("statementId"), - }) - elif ("birthDate" in statement["recordDetails"] and - statement["recordDetails"]["birthDate"]): - birth_date = parse_date_field(statement["recordDetails"]["birthDate"]) - if death_date < birth_date or (death_date - birth_date).days > 43830: + { + "type": "statement_person_death_date_not_sensible_value", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + elif ( + "birthDate" in statement["recordDetails"] + and statement["recordDetails"]["birthDate"] + ): + birth_date = parse_date_field( + statement["recordDetails"]["birthDate"] + ) + if ( + death_date < birth_date + or (death_date - birth_date).days > 43830 + ): self._additional_check_results.append( { "type": "statement_person_death_date_not_sensible_value", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) class CheckStatementEntityFoundationDissolutionDates(AdditionalCheck): @@ -926,19 +966,26 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_entity_statement_first_pass(self, statement): - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) - and "foundingDate" in statement["recordDetails"] and - statement["recordDetails"]["foundingDate"] and "dissolutionDate" in - statement["recordDetails"] and statement["recordDetails"]["dissolutionDate"]): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "foundingDate" in statement["recordDetails"] + and statement["recordDetails"]["foundingDate"] + and "dissolutionDate" in statement["recordDetails"] + and statement["recordDetails"]["dissolutionDate"] + ): founding_date = parse_date_field(statement["recordDetails"]["foundingDate"]) - dissolution_date = parse_date_field(statement["recordDetails"]["dissolutionDate"]) + dissolution_date = parse_date_field( + statement["recordDetails"]["dissolutionDate"] + ) if founding_date > dissolution_date: self._additional_check_results.append( - { - "type": "statement_entity_dissolution_before_founding_date", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_entity_dissolution_before_founding_date", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) class CheckStatementPersonBirthDateSensible(AdditionalCheck): @@ -950,25 +997,31 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_person_statement_first_pass(self, statement): - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) - and "birthDate" in statement["recordDetails"] and - statement["recordDetails"]["birthDate"]): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "birthDate" in statement["recordDetails"] + and statement["recordDetails"]["birthDate"] + ): birth_date = parse_date_field(statement["recordDetails"]["birthDate"]) if birth_date: if birth_date > datetime.now().date(): self._additional_check_results.append( - { - "type": "statement_person_birth_date_in_future", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_person_birth_date_in_future", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) elif birth_date < datetime.strptime("1800-01-01", "%Y-%m-%d").date(): self._additional_check_results.append( - { - "type": "statement_person_birth_date_too_far_in_past", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_person_birth_date_too_far_in_past", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementRelationshipInterestsStartEndDates(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -979,22 +1032,31 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_ownership_or_control_statement_first_pass(self, statement): - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) - and "interests" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interests"], list)): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "interests" in statement["recordDetails"] + and isinstance(statement["recordDetails"]["interests"], list) + ): for interest in statement["recordDetails"]["interests"]: - if ("startDate" in interest and interest["startDate"] and - "endDate" in interest and interest["endDate"]): + if ( + "startDate" in interest + and interest["startDate"] + and "endDate" in interest + and interest["endDate"] + ): start_date = parse_date_field(interest["startDate"]) end_date = parse_date_field(interest["endDate"]) if start_date and end_date: if start_date > end_date: self._additional_check_results.append( - { - "type": "statement_relationship_interests_start_after_end_date", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_relationship_interests_start_after_end_date", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementEntitySecuritiesListingsHasPublicListing(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1005,19 +1067,27 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_entity_statement_first_pass(self, statement): - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) - and "publicListing" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["publicListing"], dict) and - "securitiesListings" in statement["recordDetails"]["publicListing"] and - isinstance(statement["recordDetails"]["publicListing"]["securitiesListings"], list) - and len(statement["recordDetails"]["publicListing"]["securitiesListings"]) > 0): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "publicListing" in statement["recordDetails"] + and isinstance(statement["recordDetails"]["publicListing"], dict) + and "securitiesListings" in statement["recordDetails"]["publicListing"] + and isinstance( + statement["recordDetails"]["publicListing"]["securitiesListings"], list + ) + and len(statement["recordDetails"]["publicListing"]["securitiesListings"]) + > 0 + ): if statement["recordDetails"]["publicListing"]["hasPublicListing"] is False: self._additional_check_results.append( - { - "type": "statement_entity_securities_listings_haspubliclisting_is_false", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_entity_securities_listings_haspubliclisting_is_false", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementRelationshipInterestsShareValues(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1029,9 +1099,12 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: def check_ownership_or_control_statement_first_pass(self, statement): print("Checking Share Values:") - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) - and "interests" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interests"], list)): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "interests" in statement["recordDetails"] + and isinstance(statement["recordDetails"]["interests"], list) + ): print("Here!") for interest in statement["recordDetails"]["interests"]: print(interest) @@ -1043,48 +1116,84 @@ def check_ownership_or_control_statement_first_pass(self, statement): "type": "statement_relationship_interests_share_min_and_exclusivemin", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) elif "maximum" in share and "exclusiveMaximum" in share: self._additional_check_results.append( { "type": "statement_relationship_interests_share_max_and_exclusivemax", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) if "exact" in share and share["exact"]: print("Here!") - if any([limit in share for limit in - ("exclusiveMinimum", "minimum", "maximum", "exclusiveMaximum")]): + if any( + [ + limit in share + for limit in ( + "exclusiveMinimum", + "minimum", + "maximum", + "exclusiveMaximum", + ) + ] + ): self._additional_check_results.append( { "type": "statement_relationship_interests_exact_has_min_max", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) return - if ((("exclusiveMinimum" in share and numeric_value(share["exclusiveMinimum"])) or - ("minimum" in share and numeric_value(share["minimum"]))) and - (("exclusiveMaximum" in share and numeric_value(share["exclusiveMaximum"])) or - ("maximum" in share and numeric_value(share["maximum"])))): - min_val = (float(share["minimum"]) if "minimum" in share else - float(share["exclusiveMinimum"])) - max_val = (float(share["maximum"]) if "maximum" in share else - float(share["exclusiveMaximum"])) - print("Min:", min_val, "Max:", max_val, "Comp:", max_val >= min_val) + if ( + ( + "exclusiveMinimum" in share + and numeric_value(share["exclusiveMinimum"]) + ) + or ("minimum" in share and numeric_value(share["minimum"])) + ) and ( + ( + "exclusiveMaximum" in share + and numeric_value(share["exclusiveMaximum"]) + ) + or ("maximum" in share and numeric_value(share["maximum"])) + ): + min_val = ( + float(share["minimum"]) + if "minimum" in share + else float(share["exclusiveMinimum"]) + ) + max_val = ( + float(share["maximum"]) + if "maximum" in share + else float(share["exclusiveMaximum"]) + ) + print( + "Min:", + min_val, + "Max:", + max_val, + "Comp:", + max_val >= min_val, + ) if not max_val >= min_val: self._additional_check_results.append( - { - "type": "statement_relationship_interests_not_exact_max_greater_than_min", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_relationship_interests_not_exact_max_greater_than_min", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) elif max_val == min_val: - self._additional_check_results.append( - { - "type": "statement_relationship_interests_exact_max_equals_min", - "statement_type": None, - "statement": statement.get("statementId"), - }) + self._additional_check_results.append( + { + "type": "statement_relationship_interests_exact_max_equals_min", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) class CheckStatementDeclarationSubject(AdditionalCheck): @@ -1103,25 +1212,26 @@ def check_statement_first_pass(self, statement): else: self._statements[statement["recordId"]] = [statement["recordType"]] - def check_statement_second_pass(self, statement): if "declarationSubject" in statement: if statement["declarationSubject"] not in self._statements: self._additional_check_results.append( - { - "type": "statement_declaration_subject_not_exist", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "statement_declaration_subject_not_exist", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) else: for record_type in self._statements[statement["declarationSubject"]]: - if record_type not in ('entity', 'person'): + if record_type not in ("entity", "person"): self._additional_check_results.append( { "type": "statement_declaration_subject_not_entity_person", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) class CheckStatementIsComponent(AdditionalCheck): @@ -1136,28 +1246,39 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_statement_first_pass(self, statement): - #print("Poo!") + # print("Poo!") if "recordId" in statement: self._statements[statement["recordId"]] = self._count self._count += 1 - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) and "componentRecords" in statement["recordDetails"] - and isinstance(statement["recordDetails"]["componentRecords"], list)): + and isinstance(statement["recordDetails"]["componentRecords"], list) + ): for component_id in statement["recordDetails"]["componentRecords"]: self._components[component_id] = statement["recordId"] def check_statement_second_pass(self, statement): - if ("recordId" in statement and "recordDetails" in statement and - isinstance(statement["recordDetails"], dict) and "isComponent" in statement["recordDetails"]): - if statement["recordDetails"]["isComponent"] is True: - if statement["recordId"] not in self._components or not (self._statements[statement["recordId"]] - < self._statements[self._components[statement["recordId"]]]): - self._additional_check_results.append( - { - "type": "statement_entity_is_component_not_in_component_details", - "statement_type": None, - "statement": statement.get("statementId"), - }) + if ( + "recordId" in statement + and "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "isComponent" in statement["recordDetails"] + ): + if statement["recordDetails"]["isComponent"] is True: + if statement["recordId"] not in self._components or not ( + self._statements[statement["recordId"]] + < self._statements[self._components[statement["recordId"]]] + ): + self._additional_check_results.append( + { + "type": "statement_entity_is_component_not_in_component_details", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementDuplicateStatementId(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1180,11 +1301,13 @@ def check_statement_second_pass(self, statement): if self._statements[statement["statementId"]] > 1: self._statements[statement["statementId"]] -= 1 self._additional_check_results.append( - { - "type": "duplicate_statement_id", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "duplicate_statement_id", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementSeries(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1202,54 +1325,60 @@ def check_statement_first_pass(self, statement): record_status = statement.get("recordStatus") record_type = statement.get("recordType") if statement["recordId"] in self._series: - self._series[statement["recordId"]].append([statement["statementDate"], - record_status, - record_type]) + self._series[statement["recordId"]].append( + [statement["statementDate"], record_status, record_type] + ) else: - self._series[statement["recordId"]] = [[statement["statementDate"], - record_status, - record_type]] + self._series[statement["recordId"]] = [ + [statement["statementDate"], record_status, record_type] + ] def final_checks(self): for series in self._series: sorted_series = sort_by_date(self._series[series], 0) statuses = [s[1] for s in sorted_series] types = [s[2] for s in sorted_series] - if len([s for s in statuses if s == 'new']) > 1: + if len([s for s in statuses if s == "new"]) > 1: self._additional_check_results.append( - { - "type": "multiple_statements_in_series_with_record_status_new", - "statement_type": None, - "record": series, - }) - elif 'new' in statuses and statuses[0] != "new": + { + "type": "multiple_statements_in_series_with_record_status_new", + "statement_type": None, + "record": series, + } + ) + elif "new" in statuses and statuses[0] != "new": self._additional_check_results.append( - { - "type": "statement_with_record_status_new_must_be_first", - "statement_type": None, - "record": series, - }) - elif len([s for s in statuses if s == 'closed']) > 1: + { + "type": "statement_with_record_status_new_must_be_first", + "statement_type": None, + "record": series, + } + ) + elif len([s for s in statuses if s == "closed"]) > 1: self._additional_check_results.append( - { - "type": "multiple_statements_in_series_with_record_status_closed", - "statement_type": None, - "record": series, - }) - elif 'closed' in statuses and statuses[-1] != "closed": + { + "type": "multiple_statements_in_series_with_record_status_closed", + "statement_type": None, + "record": series, + } + ) + elif "closed" in statuses and statuses[-1] != "closed": self._additional_check_results.append( - { - "type": "statement_with_record_status_closed_must_be_last", - "statement_type": None, - "record": series, - }) + { + "type": "statement_with_record_status_closed_must_be_last", + "statement_type": None, + "record": series, + } + ) elif len(set(types)) > 1: self._additional_check_results.append( - { - "type": "statements_in_series_with_different_record_types", - "statement_type": None, - "record": series, - }) + { + "type": "statements_in_series_with_different_record_types", + "statement_type": None, + "record": series, + } + ) + class CheckComponentRecordsRecordIds(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1268,25 +1397,31 @@ def check_statement_first_pass(self, statement): self._records[statement["statementId"]] = None def check_ownership_or_control_statement_second_pass(self, statement): - if "recordDetails" in statement and isinstance(statement["recordDetails"], dict): - if ("componentRecords" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["componentRecords"], list)): + if "recordDetails" in statement and isinstance( + statement["recordDetails"], dict + ): + if "componentRecords" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["componentRecords"], list + ): for component in statement["recordDetails"]["componentRecords"]: if component not in self._records: if component in self._statements: self._additional_check_results.append( - { - "type": "component_record_is_statement_id", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "component_record_is_statement_id", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) else: self._additional_check_results.append( - { - "type": "component_record_id_not_in_dataset", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "component_record_id_not_in_dataset", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementRelationshipParties(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1303,53 +1438,83 @@ def check_statement_first_pass(self, statement): self._records[statement["recordId"]] = record_type def check_ownership_or_control_statement_second_pass(self, statement): - if "recordDetails" in statement and isinstance(statement["recordDetails"], dict): - if ("subject" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["subject"], str)): + if "recordDetails" in statement and isinstance( + statement["recordDetails"], dict + ): + if "subject" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["subject"], str + ): if statement["recordDetails"]["subject"] not in self._records: self._additional_check_results.append( - { - "type": "subject_must_be_record_id", - "statement_type": None, - "statement": statement.get("statementId"), - }) - elif not self._records[statement["recordDetails"]["subject"]] == 'entity': + { + "type": "subject_must_be_record_id", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + elif ( + not self._records[statement["recordDetails"]["subject"]] == "entity" + ): self._additional_check_results.append( - { - "type": "subject_can_only_refer_to_entity", - "statement_type": None, - "statement": statement.get("statementId"), - }) - if ("interestedParty" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interestedParty"], str)): + { + "type": "subject_can_only_refer_to_entity", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + if "interestedParty" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["interestedParty"], str + ): if statement["recordDetails"]["interestedParty"] not in self._records: self._additional_check_results.append( - { - "type": "interested_party_must_be_record_id", - "statement_type": None, - "statement": statement.get("statementId"), - }) - elif self._records[statement["recordDetails"]["interestedParty"]] not in ('entity', 'person'): + { + "type": "interested_party_must_be_record_id", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + elif self._records[ + statement["recordDetails"]["interestedParty"] + ] not in ("entity", "person"): self._additional_check_results.append( - { - "type": "interested_party_can_only_refer_to_entity_or_person", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "interested_party_can_only_refer_to_entity_or_person", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) else: - if ("interests" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interests"], list)): + if "interests" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["interests"], list + ): for interest in statement["recordDetails"]["interests"]: - if ("beneficialOwnershipOrControl" in interest and - interest["beneficialOwnershipOrControl"] is True): - if not self._records[statement["recordDetails"]["interestedParty"]] == 'person': - print("Check:", self._records[statement["recordDetails"]["interestedParty"]], 'person') + if ( + "beneficialOwnershipOrControl" in interest + and interest["beneficialOwnershipOrControl"] is True + ): + if ( + not self._records[ + statement["recordDetails"]["interestedParty"] + ] + == "person" + ): + print( + "Check:", + self._records[ + statement["recordDetails"][ + "interestedParty" + ] + ], + "person", + ) self._additional_check_results.append( - { - "type": "interest_beneficial_ownership_interested_party_not_person", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "interest_beneficial_ownership_interested_party_not_person", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckAnnotationStatementPointerTarget(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1364,14 +1529,17 @@ def check_entity_statement_first_pass(self, statement): for annotation in statement["annotations"]: if "statementPointerTarget" in annotation: try: - jsonpointer.resolve_pointer(statement, annotation["statementPointerTarget"]) + jsonpointer.resolve_pointer( + statement, annotation["statementPointerTarget"] + ) except (jsonpointer.JsonPointerException, TypeError): self._additional_check_results.append( { "type": "annotation_statement_pointer_target_invalid", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) class CheckStatementRelationshipInterests(AdditionalCheck): @@ -1384,12 +1552,15 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_statement_first_pass(self, statement): - if ("recordId" in statement and "recordDetails" in statement and - isinstance(statement["recordDetails"], dict)): + if ( + "recordId" in statement + and "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + ): record_type = statement.get("recordType") - if record_type == 'entity': + if record_type == "entity": record_type_type = statement["recordDetails"].get("entityType") - elif record_type == 'person': + elif record_type == "person": record_type_type = statement["recordDetails"].get("personType") else: record_type_type = None @@ -1397,60 +1568,106 @@ def check_statement_first_pass(self, statement): def check_ownership_or_control_statement_second_pass(self, statement): print("Here 1") - if "recordDetails" in statement and isinstance(statement["recordDetails"], dict): + if "recordDetails" in statement and isinstance( + statement["recordDetails"], dict + ): print("Here 2") - if ("interests" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interests"], list)): + if "interests" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["interests"], list + ): print("Here 3") for interest in statement["recordDetails"]["interests"]: print("Interest:", interest) - if "type" in interest and interest["type"] in ('nominee', 'nominator'): + if "type" in interest and interest["type"] in ( + "nominee", + "nominator", + ): print("Here 4") - if ("subject" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["subject"], str)): - print("Here 5", self._records[statement["recordDetails"]["subject"]]) - if not self._records[statement["recordDetails"]["subject"]][0] == 'entity': + if "subject" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["subject"], str + ): + print( + "Here 5", + self._records[statement["recordDetails"]["subject"]], + ) + if ( + not self._records[ + statement["recordDetails"]["subject"] + ][0] + == "entity" + ): self._additional_check_results.append( { "type": "relationship_interests_subject_should_be_entity_nomination_arrangement", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) else: - entity_type = self._records[statement["recordDetails"]["subject"]][1] + entity_type = self._records[ + statement["recordDetails"]["subject"] + ][1] print("Entity type:", entity_type) - if (not entity_type or not isinstance(statement["recordDetails"], dict) or - "type" not in entity_type or not entity_type["type"] == 'arrangement' or - "subtype" not in entity_type or not entity_type["subtype"] == 'nomination'): + if ( + not entity_type + or not isinstance(statement["recordDetails"], dict) + or "type" not in entity_type + or not entity_type["type"] == "arrangement" + or "subtype" not in entity_type + or not entity_type["subtype"] == "nomination" + ): self._additional_check_results.append( - { - "type": "relationship_interests_subject_should_be_entity_nomination_arrangement", - "statement_type": None, - "statement": statement.get("statementId"), - }) - elif "type" in interest and interest["type"] in ('settlor', 'trustee', 'protector'): + { + "type": "relationship_interests_subject_should_be_entity_nomination_arrangement", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + elif "type" in interest and interest["type"] in ( + "settlor", + "trustee", + "protector", + ): print("Here 4") - if ("subject" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["subject"], str)): - print("Here 5", self._records[statement["recordDetails"]["subject"]]) - if not self._records[statement["recordDetails"]["subject"]][0] == 'entity': + if "subject" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["subject"], str + ): + print( + "Here 5", + self._records[statement["recordDetails"]["subject"]], + ) + if ( + not self._records[ + statement["recordDetails"]["subject"] + ][0] + == "entity" + ): self._additional_check_results.append( { "type": "relationship_interests_subject_should_be_entity_trust", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) else: - entity_type = self._records[statement["recordDetails"]["subject"]][1] + entity_type = self._records[ + statement["recordDetails"]["subject"] + ][1] print("Entity type:", entity_type) - if (not entity_type or not isinstance(statement["recordDetails"], dict) or - "subtype" not in entity_type or not entity_type["subtype"] == 'trust'): + if ( + not entity_type + or not isinstance(statement["recordDetails"], dict) + or "subtype" not in entity_type + or not entity_type["subtype"] == "trust" + ): self._additional_check_results.append( - { - "type": "relationship_interests_subject_should_be_entity_trust", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "relationship_interests_subject_should_be_entity_trust", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) + class CheckStatementSerialisation(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1470,34 +1687,55 @@ def check_statement_first_pass(self, statement): self._records[statement["recordId"]] = self._count def check_ownership_or_control_statement_second_pass(self, statement): - if ("recordId" in statement and "recordDetails" in statement and - isinstance(statement["recordDetails"], dict)): - if ("subject" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["subject"], str)): + if ( + "recordId" in statement + and "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + ): + if "subject" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["subject"], str + ): if statement["recordDetails"]["subject"] in self._records: - if (self._records[statement["recordDetails"]["subject"]] - > self._records[statement["recordId"]]): + if ( + self._records[statement["recordDetails"]["subject"]] + > self._records[statement["recordId"]] + ): self._additional_check_results.append( { "type": "relationship_subject_not_before_relationship_in_dataset", "statement_type": None, "statement": statement.get("statementId"), - }) - if ("recordId" in statement and "recordDetails" in statement and - isinstance(statement["recordDetails"], dict)): - if ("interestedParty" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interestedParty"], str)): + } + ) + if ( + "recordId" in statement + and "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + ): + if "interestedParty" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["interestedParty"], str + ): if statement["recordDetails"]["interestedParty"] in self._records: - if (self._records[statement["recordDetails"]["interestedParty"]] - > self._records[statement["recordId"]]): - print("Ordering:", self._records[statement["recordDetails"]["interestedParty"]], - self._records[statement["recordId"]], self._records) + if ( + self._records[statement["recordDetails"]["interestedParty"]] + > self._records[statement["recordId"]] + ): + print( + "Ordering:", + self._records[ + statement["recordDetails"]["interestedParty"] + ], + self._records[statement["recordId"]], + self._records, + ) self._additional_check_results.append( { "type": "relationship_interested_party_not_before_relationship_in_dataset", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) + class CheckStatementPersonIdentifiersHaveCorrectScheme(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1511,39 +1749,79 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_person_statement_first_pass(self, statement): - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) and - "identifiers" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["identifiers"], list)): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "identifiers" in statement["recordDetails"] + and isinstance(statement["recordDetails"]["identifiers"], list) + ): for identifier in statement["recordDetails"]["identifiers"]: print("Got here!") - if not ("scheme" in identifier and identifier["scheme"].count("-") == 1 and - len(identifier["scheme"].split("-")[0]) > 0 and - len(identifier["scheme"].split("-")[1]) > 0): + if not ( + "scheme" in identifier + and identifier["scheme"].count("-") == 1 + and len(identifier["scheme"].split("-")[0]) > 0 + and len(identifier["scheme"].split("-")[1]) > 0 + ): self._additional_check_results.append( - { - "type": "person_identifiers_invalid_composition", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "person_identifiers_invalid_composition", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) else: - other_codes = ("BAH", "D", "EUE", "GBD", "GBN", "GBO", "GBP", "GBS", "UNA", "UNK", - "UNO", "XBA", "XIM", "XCC", "XCO", "XEC", "XPO", "XOM", "XXA", "XXB", - "XXC", "XXX", "ZIM") - if (not pycountry.countries.get(alpha_3=identifier["scheme"].split("-")[0]) and - identifier["scheme"].split("-")[0] not in other_codes): + other_codes = ( + "BAH", + "D", + "EUE", + "GBD", + "GBN", + "GBO", + "GBP", + "GBS", + "UNA", + "UNK", + "UNO", + "XBA", + "XIM", + "XCC", + "XCO", + "XEC", + "XPO", + "XOM", + "XXA", + "XXB", + "XXC", + "XXX", + "ZIM", + ) + if ( + not pycountry.countries.get( + alpha_3=identifier["scheme"].split("-")[0] + ) + and identifier["scheme"].split("-")[0] not in other_codes + ): self._additional_check_results.append( { "type": "person_identifiers_no_valid_iso_3166_1_alpha_3_code", "statement_type": None, "statement": statement.get("statementId"), - }) - elif identifier["scheme"].split("-")[1] not in ('PASSPORT', 'TAXID', 'IDCARD'): + } + ) + elif identifier["scheme"].split("-")[1] not in ( + "PASSPORT", + "TAXID", + "IDCARD", + ): self._additional_check_results.append( { "type": "person_identifiers_not_passport_taxid_idcard", "statement_type": None, "statement": statement.get("statementId"), - }) + } + ) + class CheckStatementEntityIdentifiersHaveKnownScheme(AdditionalCheck): def __init__(self, lib_cove_bods_config, schema_object): @@ -1555,14 +1833,21 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: return schema_object.is_schema_version_equal_to_or_greater_than("0.4") def check_entity_statement_first_pass(self, statement): - if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) and - "identifiers" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["identifiers"], list)): + if ( + "recordDetails" in statement + and isinstance(statement["recordDetails"], dict) + and "identifiers" in statement["recordDetails"] + and isinstance(statement["recordDetails"]["identifiers"], list) + ): for identifier in statement["recordDetails"]["identifiers"]: - if not ("scheme" in identifier and identifier["scheme"] in self.orgids_prefixes): + if not ( + "scheme" in identifier + and identifier["scheme"] in self.orgids_prefixes + ): self._additional_check_results.append( - { - "type": "entity_identifiers_not_known_scheme", - "statement_type": None, - "statement": statement.get("statementId"), - }) + { + "type": "entity_identifiers_not_known_scheme", + "statement_type": None, + "statement": statement.get("statementId"), + } + ) diff --git a/libcovebods/tasks/statistics.py b/libcovebods/tasks/statistics.py index 9e6cc48..d2c8a35 100644 --- a/libcovebods/tasks/statistics.py +++ b/libcovebods/tasks/statistics.py @@ -61,13 +61,12 @@ def get_statistics(self): data = { "count_entity_statements": self.count_entity_statements, "count_entity_statements_types": self.count_entity_statements_types, - "count_entity_statements_types_with_any_identifier": - self.count_entity_statements_types_with_any_identifier, - "count_entity_statements_types_with_any_identifier_with_id_and_scheme": - self.count_entity_statements_types_with_any_identifier_with_id_and_scheme, + "count_entity_statements_types_with_any_identifier": self.count_entity_statements_types_with_any_identifier, + "count_entity_statements_types_with_any_identifier_with_id_and_scheme": self.count_entity_statements_types_with_any_identifier_with_id_and_scheme, } return data + class StatisticsCountEntityRecordStatements(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: @@ -93,9 +92,12 @@ def check_entity_statement_first_pass(self, statement): and isinstance(statement["recordDetails"], dict) and "entityType" in statement["recordDetails"] and isinstance(statement["recordDetails"]["entityType"], str) - and statement["recordDetails"]["entityType"] in self.count_entity_statements_types + and statement["recordDetails"]["entityType"] + in self.count_entity_statements_types ): - self.count_entity_statements_types[statement["recordDetails"]["entityType"]] += 1 + self.count_entity_statements_types[ + statement["recordDetails"]["entityType"] + ] += 1 if "identifiers" in statement and isinstance( statement["identifiers"], list ): @@ -127,13 +129,12 @@ def get_statistics(self): data = { "count_entity_statements": self.count_entity_statements, "count_entity_statements_types": self.count_entity_statements_types, - "count_entity_statements_types_with_any_identifier": - self.count_entity_statements_types_with_any_identifier, - "count_entity_statements_types_with_any_identifier_with_id_and_scheme": - self.count_entity_statements_types_with_any_identifier_with_id_and_scheme, + "count_entity_statements_types_with_any_identifier": self.count_entity_statements_types_with_any_identifier, + "count_entity_statements_types_with_any_identifier_with_id_and_scheme": self.count_entity_statements_types_with_any_identifier_with_id_and_scheme, } return data + class StatisticsCountPersonStatements(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: @@ -152,7 +153,6 @@ def check_person_statement_first_pass(self, statement): "personType" in statement and isinstance(statement["personType"], str) and statement["personType"] in self.count_person_statements_types - ): self.count_person_statements_types[statement["personType"]] += 1 @@ -163,6 +163,7 @@ def get_statistics(self): } return data + class StatisticsCountPersonRecordStatements(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: @@ -182,9 +183,12 @@ def check_person_statement_first_pass(self, statement): and isinstance(statement["recordDetails"], dict) and "personType" in statement["recordDetails"] and isinstance(statement["recordDetails"]["personType"], str) - and statement["recordDetails"]["personType"] in self.count_person_statements_types + and statement["recordDetails"]["personType"] + in self.count_person_statements_types ): - self.count_person_statements_types[statement["recordDetails"]["personType"]] += 1 + self.count_person_statements_types[ + statement["recordDetails"]["personType"] + ] += 1 def get_statistics(self): data = { @@ -193,6 +197,7 @@ def get_statistics(self): } return data + class StatisticsCountOwnershipOrControlStatements(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: @@ -285,28 +290,22 @@ def check_ownership_or_control_statement_first_pass(self, statement): def get_statistics(self): data = { "count_ownership_or_control_statement": self.count_ownership_or_control_statement, - "count_ownership_or_control_statement_interested_party_with_person": - self.count_ownership_or_control_statement_interested_party_with_person, - "count_ownership_or_control_statement_interested_party_with_entity": - self.count_ownership_or_control_statement_interested_party_with_entity, - "count_ownership_or_control_statement_interested_party_with_unspecified": - self.count_ownership_or_control_statement_interested_party_with_unspecified, - "count_ownership_or_control_statement_interest_statement_types": - self.count_ownership_or_control_statement_interest_statement_types, + "count_ownership_or_control_statement_interested_party_with_person": self.count_ownership_or_control_statement_interested_party_with_person, + "count_ownership_or_control_statement_interested_party_with_entity": self.count_ownership_or_control_statement_interested_party_with_entity, + "count_ownership_or_control_statement_interested_party_with_unspecified": self.count_ownership_or_control_statement_interested_party_with_unspecified, + "count_ownership_or_control_statement_interest_statement_types": self.count_ownership_or_control_statement_interest_statement_types, "count_ownership_or_control_statement_by_year": self.count_ownership_or_control_statement_by_year, "count_ownership_or_control_statement_subject_by_year": { year: len(year_set) for year, year_set in self.subject_statement_ids_by_year.items() }, - "count_ownership_or_control_statement_interested_party_with_entity_by_year": - self.count_ownership_or_control_statement_interested_party_with_entity_by_year, - "count_ownership_or_control_statement_interested_party_with_person_by_year": - self.count_ownership_or_control_statement_interested_party_with_person_by_year, - "count_ownership_or_control_statement_interested_party_with_unspecified_by_year": - self.count_ownership_or_control_statement_interested_party_with_unspecified_by_year, + "count_ownership_or_control_statement_interested_party_with_entity_by_year": self.count_ownership_or_control_statement_interested_party_with_entity_by_year, + "count_ownership_or_control_statement_interested_party_with_person_by_year": self.count_ownership_or_control_statement_interested_party_with_person_by_year, + "count_ownership_or_control_statement_interested_party_with_unspecified_by_year": self.count_ownership_or_control_statement_interested_party_with_unspecified_by_year, } return data + class StatisticsCountOwnershipOrControlRecordStatements(AdditionalCheck): @staticmethod def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool: @@ -327,8 +326,8 @@ def __init__(self, lib_cove_bods_config, schema_object): ] = 0 self.count_ownership_or_control_statement_by_year = defaultdict(int) self.subject_statement_ids_by_year = defaultdict(set) - self.count_ownership_or_control_statement_interested_party_by_year = defaultdict( - int + self.count_ownership_or_control_statement_interested_party_by_year = ( + defaultdict(int) ) def check_ownership_or_control_statement_first_pass(self, statement): @@ -337,13 +336,18 @@ def check_ownership_or_control_statement_first_pass(self, statement): except (ValueError, AttributeError): year = None self.count_ownership_or_control_statement += 1 - if "recordDetails" in statement and isinstance(statement["recordDetails"], dict): + if "recordDetails" in statement and isinstance( + statement["recordDetails"], dict + ): interested_party = statement["recordDetails"].get("interestedParty") if interested_party: self.count_ownership_or_control_statement_interested_party += 1 - self.count_ownership_or_control_statement_interested_party_by_year[year] += 1 - if ("interests" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["interests"], list)): + self.count_ownership_or_control_statement_interested_party_by_year[ + year + ] += 1 + if "interests" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["interests"], list + ): for interest in statement["recordDetails"]["interests"]: if isinstance(interest, dict): if ( @@ -355,30 +359,30 @@ def check_ownership_or_control_statement_first_pass(self, statement): self.count_ownership_or_control_statement_interest_statement_types[ interest["type"] ] += 1 - if ("subject" in statement["recordDetails"] and - isinstance(statement["recordDetails"]["subject"], str)): - self.subject_statement_ids_by_year[year].add(statement["recordDetails"]["subject"]) + if "subject" in statement["recordDetails"] and isinstance( + statement["recordDetails"]["subject"], str + ): + self.subject_statement_ids_by_year[year].add( + statement["recordDetails"]["subject"] + ) if "statementDate" in statement: self.count_ownership_or_control_statement_by_year[year] += 1 def get_statistics(self): data = { "count_ownership_or_control_statement": self.count_ownership_or_control_statement, - "count_ownership_or_control_statement_interested_party": - self.count_ownership_or_control_statement_interested_party, - "count_ownership_or_control_statement_interest_statement_types": - self.count_ownership_or_control_statement_interest_statement_types, - "count_ownership_or_control_statement_by_year": - self.count_ownership_or_control_statement_by_year, + "count_ownership_or_control_statement_interested_party": self.count_ownership_or_control_statement_interested_party, + "count_ownership_or_control_statement_interest_statement_types": self.count_ownership_or_control_statement_interest_statement_types, + "count_ownership_or_control_statement_by_year": self.count_ownership_or_control_statement_by_year, "count_ownership_or_control_statement_subject_by_year": { year: len(year_set) for year, year_set in self.subject_statement_ids_by_year.items() }, - "count_ownership_or_control_statement_interested_party_by_year": - self.count_ownership_or_control_statement_interested_party_by_year, + "count_ownership_or_control_statement_interested_party_by_year": self.count_ownership_or_control_statement_interested_party_by_year, } return data + class StatisticsCurrentOwnershipOrControlStatementsAndReplacesStatementsMissing( AdditionalCheck ): @@ -462,8 +466,7 @@ def get_statistics(self): "count_addresses": self.count_addresses, "count_addresses_with_postcode": self.count_addresses_with_postcode, "count_addresses_with_country": self.count_addresses_with_country, - "count_addresses_with_postcode_duplicated_in_address": - self.count_addresses_with_postcode_duplicated_in_address, + "count_addresses_with_postcode_duplicated_in_address": self.count_addresses_with_postcode_duplicated_in_address, } return data @@ -501,8 +504,7 @@ def check_ownership_or_control_statement_first_pass(self, statement): def get_statistics(self): return { - "count_ownership_or_control_statement_interest_direct_or_indirect": - self.count_ownership_or_control_statement_interest_direct_or_indirect, + "count_ownership_or_control_statement_interest_direct_or_indirect": self.count_ownership_or_control_statement_interest_direct_or_indirect, } @@ -537,9 +539,12 @@ def __init__(self, lib_cove_bods_config, schema_object): self._declaration_subjects = {} def check_statement_first_pass(self, statement): - if ("recordId" in statement and "declarationSubject" in statement and - statement["recordId"] == statement["declarationSubject"]): - self._declaration_subjects[statement["recordId"]] = 1 + if ( + "recordId" in statement + and "declarationSubject" in statement + and statement["recordId"] == statement["declarationSubject"] + ): + self._declaration_subjects[statement["recordId"]] = 1 def get_statistics(self): return { diff --git a/libcovebods/utils.py b/libcovebods/utils.py index fae6c86..79f89f6 100644 --- a/libcovebods/utils.py +++ b/libcovebods/utils.py @@ -25,6 +25,7 @@ def is_interest_current(interest): else: return True + def get_statement_type(statement, schema_object): if schema_object.is_schema_version_equal_to_or_greater_than("0.4"): record_type = statement.get("recordType") @@ -38,6 +39,7 @@ def get_statement_type(statement, schema_object): else: return statement.get("statementType") + def parse_date_field(date_str): print(date_str) if not isinstance(date_str, str): @@ -53,6 +55,7 @@ def parse_date_field(date_str): else: return None + def numeric_value(value): try: float(value) @@ -60,6 +63,7 @@ def numeric_value(value): except ValueError: return False + def sort_by_date(list_with_date, index): out = [] for item in list_with_date: diff --git a/tests/test_additional_checks_0_4_0.py b/tests/test_additional_checks_0_4_0.py index fa32a5c..7e5b5b4 100644 --- a/tests/test_additional_checks_0_4_0.py +++ b/tests/test_additional_checks_0_4_0.py @@ -24,6 +24,7 @@ def test_retrievedat_not_future_date_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_retrievedat_not_future_date_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -44,6 +45,7 @@ def test_retrievedat_not_future_date_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_retrievedat_not_future_date_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -64,6 +66,7 @@ def test_retrievedat_not_future_date_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_retrievedat_not_future_date_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -84,6 +87,7 @@ def test_retrievedat_not_future_date_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_date_not_future_date_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -104,6 +108,7 @@ def test_statement_date_not_future_date_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_date_not_future_date_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -124,6 +129,7 @@ def test_statement_date_not_future_date_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_date_not_future_date_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -144,6 +150,7 @@ def test_statement_date_not_future_date_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_creation_date_not_future_date_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -164,6 +171,7 @@ def test_statement_creation_date_not_future_date_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_creation_date_not_future_date_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -184,6 +192,7 @@ def test_statement_creation_date_not_future_date_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_creation_date_not_future_date_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -225,6 +234,7 @@ def test_statement_publication_date_future_date_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_publication_date_future_date_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -245,6 +255,7 @@ def test_statement_publication_date_future_date_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_publication_date_future_date_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -265,6 +276,7 @@ def test_statement_publication_date_future_date_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_publication_date_future_date_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -285,6 +297,7 @@ def test_statement_publication_date_future_date_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_person_date_of_death_sane_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -305,6 +318,7 @@ def test_statement_person_date_of_death_sane_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_date_of_death_sane_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -325,6 +339,7 @@ def test_statement_person_date_of_death_sane_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_date_of_death_sane_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -345,6 +360,7 @@ def test_statement_person_date_of_death_sane_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_date_of_death_sane_invalid_4(): cove_temp_folder = tempfile.mkdtemp( @@ -365,6 +381,7 @@ def test_statement_person_date_of_death_sane_invalid_4(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_date_of_death_sane_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -385,6 +402,7 @@ def test_statement_person_date_of_death_sane_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_person_date_of_death_sane_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -426,6 +444,7 @@ def test_statement_entity_foundation_dissolution_dates_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_entity_foundation_dissolution_dates_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -446,6 +465,7 @@ def test_statement_entity_foundation_dissolution_dates_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_entity_foundation_dissolution_dates_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -487,6 +507,7 @@ def test_statement_person_birth_date_sensible_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_birth_date_sensible_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -507,6 +528,7 @@ def test_statement_person_birth_date_sensible_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_birth_date_sensible_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -527,6 +549,7 @@ def test_statement_person_birth_date_sensible_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_birth_date_sensible_invalid_4(): cove_temp_folder = tempfile.mkdtemp( @@ -547,6 +570,7 @@ def test_statement_person_birth_date_sensible_invalid_4(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_birth_date_sensible_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -569,6 +593,7 @@ def test_statement_person_birth_date_sensible_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_person_birth_date_sensible_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -591,6 +616,7 @@ def test_statement_person_birth_date_sensible_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_relationship_interests_start_end_dates_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -611,6 +637,7 @@ def test_statement_relationship_interests_start_end_dates_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_interests_start_end_dates_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -631,6 +658,7 @@ def test_statement_relationship_interests_start_end_dates_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_relationship_interests_start_end_dates_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -672,6 +700,7 @@ def test_statement_entity_securities_listings_haspubliclisting_not_false_invalid assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_entity_securities_listings_haspubliclisting_not_false_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -692,6 +721,7 @@ def test_statement_entity_securities_listings_haspubliclisting_not_false_valid_1 assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_entity_securities_listings_haspubliclisting_not_false_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -712,6 +742,7 @@ def test_statement_entity_securities_listings_haspubliclisting_not_false_valid_2 assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_relationship_interests_share_values_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -732,6 +763,7 @@ def test_statement_relationship_interests_share_values_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_interests_share_values_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -752,6 +784,7 @@ def test_statement_relationship_interests_share_values_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_interests_share_values_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -772,6 +805,7 @@ def test_statement_relationship_interests_share_values_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_interests_share_values_invalid_4(): cove_temp_folder = tempfile.mkdtemp( @@ -792,6 +826,7 @@ def test_statement_relationship_interests_share_values_invalid_4(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_interests_share_values_invalid_5(): cove_temp_folder = tempfile.mkdtemp( @@ -812,6 +847,7 @@ def test_statement_relationship_interests_share_values_invalid_5(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_interests_share_values_invalid_6(): cove_temp_folder = tempfile.mkdtemp( @@ -834,6 +870,7 @@ def test_statement_relationship_interests_share_values_invalid_6(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_declaration_subject_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -854,6 +891,7 @@ def test_statement_declaration_subject_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_declaration_subject_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -874,6 +912,7 @@ def test_statement_declaration_subject_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_declaration_subject_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -894,6 +933,7 @@ def test_statement_declaration_subject_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_entity_is_component_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -916,6 +956,7 @@ def test_statement_entity_is_component_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_entity_is_component_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -936,7 +977,8 @@ def test_statement_entity_is_component_invalid_2(): assert results["schema_version"] == "0.4" assert results["validation_errors_count"] == 0 assert results["additional_fields_count"] == 0 - assert results["additional_checks_count"] == 3 # 2 extra ordering errors + assert results["additional_checks_count"] == 3 # 2 extra ordering errors + def test_statement_entity_is_component_valid_1(): @@ -960,6 +1002,7 @@ def test_statement_entity_is_component_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_person_is_component_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -980,7 +1023,8 @@ def test_statement_person_is_component_invalid_1(): assert results["schema_version"] == "0.4" assert results["validation_errors_count"] == 0 assert results["additional_fields_count"] == 0 - assert results["additional_checks_count"] == 2 # Extra ordering error + assert results["additional_checks_count"] == 2 # Extra ordering error + def test_statement_person_is_component_invalid_2(): @@ -1004,6 +1048,7 @@ def test_statement_person_is_component_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_is_component_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1026,6 +1071,7 @@ def test_statement_person_is_component_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_relationship_is_component_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1048,6 +1094,7 @@ def test_statement_relationship_is_component_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_is_component_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1070,6 +1117,7 @@ def test_statement_relationship_is_component_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_is_component_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1092,6 +1140,7 @@ def test_statement_relationship_is_component_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_duplicate_statement_id_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1102,7 +1151,7 @@ def test_statement_duplicate_statement_id_invalid_1(): "fixtures", "0.4", "additional-checks", - #"statement_duplicate_statement_id-invalid-1.json", + # "statement_duplicate_statement_id-invalid-1.json", "statement_must_have_unique_statement_identifier-invalid-1.json", ) @@ -1115,6 +1164,7 @@ def test_statement_duplicate_statement_id_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_duplicate_statement_id_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1125,7 +1175,7 @@ def test_statement_duplicate_statement_id_invalid_2(): "fixtures", "0.4", "additional-checks", - #"statement_duplicate_statement_id-invalid-2.json", + # "statement_duplicate_statement_id-invalid-2.json", "statement_must_have_unique_statement_identifier-invalid-1.json", ) @@ -1138,6 +1188,7 @@ def test_statement_duplicate_statement_id_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_duplicate_statement_id_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -1160,6 +1211,7 @@ def test_statement_duplicate_statement_id_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_duplicate_statement_id_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1170,7 +1222,7 @@ def test_statement_duplicate_statement_id_valid_1(): "fixtures", "0.4", "additional-checks", - #"statement_duplicate_statement_id-valid-1.json", + # "statement_duplicate_statement_id-valid-1.json", "statement_must_have_unique_statement_identifier-valid-1.json", ) @@ -1183,6 +1235,7 @@ def test_statement_duplicate_statement_id_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_series_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1205,6 +1258,7 @@ def test_statement_series_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_series_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1227,6 +1281,7 @@ def test_statement_series_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_series_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -1249,6 +1304,7 @@ def test_statement_series_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_series_invalid_4(): cove_temp_folder = tempfile.mkdtemp( @@ -1271,6 +1327,7 @@ def test_statement_series_invalid_4(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_series_invalid_5(): cove_temp_folder = tempfile.mkdtemp( @@ -1293,6 +1350,7 @@ def test_statement_series_invalid_5(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_series_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1315,6 +1373,7 @@ def test_statement_series_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_series_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1337,6 +1396,7 @@ def test_statement_series_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_relationship_component_records_are_record_ids_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1359,6 +1419,7 @@ def test_statement_relationship_component_records_are_record_ids_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_component_records_are_record_ids_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1381,6 +1442,7 @@ def test_statement_relationship_component_records_are_record_ids_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_relationship_component_records_are_record_ids_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1403,6 +1465,7 @@ def test_statement_relationship_component_records_are_record_ids_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_relationship_subject_refers_to_entity_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1425,6 +1488,7 @@ def test_relationship_subject_refers_to_entity_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_subject_refers_to_entity_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1447,6 +1511,7 @@ def test_relationship_subject_refers_to_entity_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_subject_refers_to_entity_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1469,6 +1534,7 @@ def test_relationship_subject_refers_to_entity_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_relationship_interested_party_entity_or_person_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1491,6 +1557,7 @@ def test_relationship_interested_party_entity_or_person_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_annotiation_statement_pointer_target_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1513,6 +1580,7 @@ def test_statement_annotiation_statement_pointer_target_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_annotiation_statement_pointer_target_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1535,6 +1603,7 @@ def test_statement_annotiation_statement_pointer_target_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_annotiation_statement_pointer_target_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -1580,6 +1649,7 @@ def test_statement_annotiation_statement_pointer_target_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_annotiation_statement_pointer_target_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1602,6 +1672,7 @@ def test_statement_annotiation_statement_pointer_target_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_interested_party_person_with_beneficial_ownership_or_control_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1624,6 +1695,7 @@ def test_interested_party_person_with_beneficial_ownership_or_control_invalid_1( assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_interested_party_person_with_beneficial_ownership_or_control_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1646,6 +1718,7 @@ def test_interested_party_person_with_beneficial_ownership_or_control_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_interested_party_person_with_beneficial_ownership_or_control_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1668,6 +1741,7 @@ def test_interested_party_person_with_beneficial_ownership_or_control_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_relationship_interest_nominee_or_nominator_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1690,6 +1764,7 @@ def test_relationship_interest_nominee_or_nominator_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_interest_nominee_or_nominator_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1712,6 +1787,7 @@ def test_relationship_interest_nominee_or_nominator_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_interest_nominee_or_nominator_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1734,6 +1810,7 @@ def test_relationship_interest_nominee_or_nominator_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_relationship_interest_for_trusts_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1756,6 +1833,7 @@ def test_relationship_interest_for_trusts_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_interest_for_trusts_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1778,6 +1856,7 @@ def test_relationship_interest_for_trusts_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_interest_for_trusts_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -1800,6 +1879,7 @@ def test_relationship_interest_for_trusts_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_relationship_interest_for_trusts_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1822,6 +1902,7 @@ def test_relationship_interest_for_trusts_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_relationship_interest_for_trusts_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1844,6 +1925,7 @@ def test_relationship_interest_for_trusts_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_relationship_interest_for_trusts_valid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -1889,6 +1971,7 @@ def test_statement_serialisation_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_serialisation_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -1911,6 +1994,7 @@ def test_statement_serialisation_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_serialisation_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -1933,6 +2017,7 @@ def test_statement_serialisation_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_serialisation_invalid_4(): cove_temp_folder = tempfile.mkdtemp( @@ -1955,6 +2040,7 @@ def test_statement_serialisation_invalid_4(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_serialisation_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1977,6 +2063,7 @@ def test_statement_serialisation_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_person_identifiers_have_correct_scheme_invalid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -1999,6 +2086,7 @@ def test_statement_person_identifiers_have_correct_scheme_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_identifiers_have_correct_scheme_invalid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -2021,6 +2109,7 @@ def test_statement_person_identifiers_have_correct_scheme_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_identifiers_have_correct_scheme_invalid_3(): cove_temp_folder = tempfile.mkdtemp( @@ -2043,6 +2132,7 @@ def test_statement_person_identifiers_have_correct_scheme_invalid_3(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_identifiers_have_correct_scheme_invalid_4(): cove_temp_folder = tempfile.mkdtemp( @@ -2065,6 +2155,7 @@ def test_statement_person_identifiers_have_correct_scheme_invalid_4(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_identifiers_have_correct_scheme_invalid_5(): cove_temp_folder = tempfile.mkdtemp( @@ -2087,6 +2178,7 @@ def test_statement_person_identifiers_have_correct_scheme_invalid_5(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_identifiers_have_correct_scheme_invalid_6(): cove_temp_folder = tempfile.mkdtemp( @@ -2109,6 +2201,7 @@ def test_statement_person_identifiers_have_correct_scheme_invalid_6(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_person_identifiers_have_correct_scheme_valid_1(): cove_temp_folder = tempfile.mkdtemp( @@ -2131,6 +2224,7 @@ def test_statement_person_identifiers_have_correct_scheme_valid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_person_identifiers_have_correct_scheme_valid_2(): cove_temp_folder = tempfile.mkdtemp( @@ -2153,6 +2247,7 @@ def test_statement_person_identifiers_have_correct_scheme_valid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 0 + def test_statement_entity_identifier_scheme_known_invalid_1(): # Testing CheckStatementEntityIdentifiersHaveKnownScheme @@ -2176,6 +2271,7 @@ def test_statement_entity_identifier_scheme_known_invalid_1(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_entity_identifier_scheme_known_invalid_2(): # Testing CheckStatementEntityIdentifiersHaveKnownScheme @@ -2199,6 +2295,7 @@ def test_statement_entity_identifier_scheme_known_invalid_2(): assert results["additional_fields_count"] == 0 assert results["additional_checks_count"] == 1 + def test_statement_entity_identifier_scheme_known_valid_1(): # Testing CheckStatementEntityIdentifiersHaveKnownScheme diff --git a/tests/test_api_0_4.py b/tests/test_api_0_4.py index cd4537a..96cb882 100644 --- a/tests/test_api_0_4.py +++ b/tests/test_api_0_4.py @@ -70,13 +70,10 @@ def test_entity_sub_type_does_not_align_1(): assert results["additional_checks_count"] == 0 assert ( - results["validation_errors"][0]["message"] - == "'trust' is not one of ['other']" - ) - assert ( - results["validation_errors"][0]['path_ending'] - == 'subtype' + results["validation_errors"][0]["message"] == "'trust' is not one of ['other']" ) + assert results["validation_errors"][0]["path_ending"] == "subtype" + def test_entity_sub_type_does_align_1(): diff --git a/tests/test_lib_common.py b/tests/test_lib_common.py index 49d75ee..2aad091 100644 --- a/tests/test_lib_common.py +++ b/tests/test_lib_common.py @@ -71,10 +71,7 @@ def test_this_YYYY(self): def test_this_YYYY_MM(self): today = dateutil.utils.today(UTC) - assert ( - is_interest_current({"endDate": f"{today.year}-{today.month}"}) - is True - ) + assert is_interest_current({"endDate": f"{today.year}-{today.month}"}) is True def test_this_date(self): assert ( diff --git a/tests/test_schema_validation_0_4.py b/tests/test_schema_validation_0_4.py index 9694015..54f690e 100644 --- a/tests/test_schema_validation_0_4.py +++ b/tests/test_schema_validation_0_4.py @@ -11,29 +11,34 @@ @pytest.fixture def expected_errors(): expected = [] - with open('tests/fixtures/0.4/invalid-schema/expected_errors.csv', newline='') as csvfile: + with open( + "tests/fixtures/0.4/invalid-schema/expected_errors.csv", newline="" + ) as csvfile: reader = csv.reader(csvfile) for row in reader: expected.append(row) return expected + @pytest.fixture def valid_statements(): - return pathlib.Path('tests/fixtures/0.4/valid-schema').glob("*.json") + return pathlib.Path("tests/fixtures/0.4/valid-schema").glob("*.json") + def extract_elements(element_path): - elems = [0 if elem == '$[0]' else elem for elem in element_path.split(".")] + elems = [0 if elem == "$[0]" else elem for elem in element_path.split(".")] out = [] for elem in elems: - if isinstance(elem, str) and '[0]' in elem: - out.append(elem.split('[0]')[0]) + if isinstance(elem, str) and "[0]" in elem: + out.append(elem.split("[0]")[0]) out.append(0) else: out.append(elem) - if (len(out) == 1 and out[0] == '$'): + if len(out) == 1 and out[0] == "$": out = [] return out + def test_all_schema_validation_invalid(expected_errors): cove_temp_folder = tempfile.mkdtemp( @@ -43,27 +48,46 @@ def test_all_schema_validation_invalid(expected_errors): for expected in expected_errors: json_filename = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "fixtures", "0.4", "invalid-schema", expected[0] + os.path.dirname(os.path.realpath(__file__)), + "fixtures", + "0.4", + "invalid-schema", + expected[0], ) results = bods_json_output(cove_temp_folder, json_filename) assert results["schema_version"] == "0.4" - if expected[1] in ('enum', 'type'): - assert results['validation_errors_count'] > 0 + if expected[1] in ("enum", "type"): + assert results["validation_errors_count"] > 0 else: - assert results['validation_errors_count'] == 1 - assert results['validation_errors'][0]['validator'] == expected[1] - assert results['validation_errors'][0]['path'] == extract_elements(expected[2]) - if expected[1] == 'required': - assert results['validation_errors'][0]['extra']['required_key_which_is_missing'] == expected[3] - elif expected[1] in ('enum', 'anyOf', 'type', 'maxLength', 'minLength', - 'format', 'minimum', 'maximum', 'pattern', 'oneOf', - 'const'): + assert results["validation_errors_count"] == 1 + assert results["validation_errors"][0]["validator"] == expected[1] + assert results["validation_errors"][0]["path"] == extract_elements(expected[2]) + if expected[1] == "required": + assert ( + results["validation_errors"][0]["extra"][ + "required_key_which_is_missing" + ] + == expected[3] + ) + elif expected[1] in ( + "enum", + "anyOf", + "type", + "maxLength", + "minLength", + "format", + "minimum", + "maximum", + "pattern", + "oneOf", + "const", + ): pass else: print(expected) - print(results['validation_errors'][0]) + print(results["validation_errors"][0]) def test_all_schema_validation_valid(valid_statements): @@ -77,4 +101,4 @@ def test_all_schema_validation_valid(valid_statements): results = bods_json_output(cove_temp_folder, valid_statement) assert results["schema_version"] == "0.4" - assert results['validation_errors_count'] == 0 + assert results["validation_errors_count"] == 0 diff --git a/tests/test_stat_count_declaration_subjects.py b/tests/test_stat_count_declaration_subjects.py index 626c38f..a9c79f9 100644 --- a/tests/test_stat_count_declaration_subjects.py +++ b/tests/test_stat_count_declaration_subjects.py @@ -13,11 +13,11 @@ def test_schema_0_4_count(): os.path.dirname(os.path.realpath(__file__)), "fixtures", "0.4", - "statistic_count_declaration_subjects.json" + "statistic_count_declaration_subjects.json", ) results = bods_json_output(cove_temp_folder, json_filename) assert results["schema_version"] == "0.4" - assert (results["statistics"]["count_declaration_subjects"] == 2) + assert results["statistics"]["count_declaration_subjects"] == 2 diff --git a/tests/test_stat_count_entity_statements.py b/tests/test_stat_count_entity_statements.py index 9c1830b..bd61d2b 100644 --- a/tests/test_stat_count_entity_statements.py +++ b/tests/test_stat_count_entity_statements.py @@ -17,10 +17,23 @@ def test_schema_0_2(): assert results["schema_version"] == "0.2" - assert (results["statistics"]["count_entity_statements"] == 1) - assert (results["statistics"]["count_entity_statements_types"]['registeredEntity'] == 1) - assert (results["statistics"]["count_entity_statements_types_with_any_identifier"]['registeredEntity'] == 1) - assert (results["statistics"]["count_entity_statements_types_with_any_identifier_with_id_and_scheme"]['registeredEntity'] == 1) + assert results["statistics"]["count_entity_statements"] == 1 + assert ( + results["statistics"]["count_entity_statements_types"]["registeredEntity"] == 1 + ) + assert ( + results["statistics"]["count_entity_statements_types_with_any_identifier"][ + "registeredEntity" + ] + == 1 + ) + assert ( + results["statistics"][ + "count_entity_statements_types_with_any_identifier_with_id_and_scheme" + ]["registeredEntity"] + == 1 + ) + def test_schema_0_3_basic_1(): @@ -35,10 +48,23 @@ def test_schema_0_3_basic_1(): assert results["schema_version"] == "0.3" - assert (results["statistics"]["count_entity_statements"] == 1) - assert (results["statistics"]["count_entity_statements_types"]['registeredEntity'] == 1) - assert (results["statistics"]["count_entity_statements_types_with_any_identifier"]['registeredEntity'] == 1) - assert (results["statistics"]["count_entity_statements_types_with_any_identifier_with_id_and_scheme"]['registeredEntity'] == 1) + assert results["statistics"]["count_entity_statements"] == 1 + assert ( + results["statistics"]["count_entity_statements_types"]["registeredEntity"] == 1 + ) + assert ( + results["statistics"]["count_entity_statements_types_with_any_identifier"][ + "registeredEntity" + ] + == 1 + ) + assert ( + results["statistics"][ + "count_entity_statements_types_with_any_identifier_with_id_and_scheme" + ]["registeredEntity"] + == 1 + ) + def test_schema_0_4_basic_1(): @@ -53,4 +79,4 @@ def test_schema_0_4_basic_1(): assert results["schema_version"] == "0.4" - assert (results["statistics"]["count_entity_statements"] == 1) + assert results["statistics"]["count_entity_statements"] == 1 diff --git a/tests/test_stat_count_ownership_or_control_statement.py b/tests/test_stat_count_ownership_or_control_statement.py index 3792ff6..11b7e6d 100644 --- a/tests/test_stat_count_ownership_or_control_statement.py +++ b/tests/test_stat_count_ownership_or_control_statement.py @@ -17,7 +17,8 @@ def test_schema_0_2(): assert results["schema_version"] == "0.2" - assert (results["statistics"]["count_ownership_or_control_statement"] == 1) + assert results["statistics"]["count_ownership_or_control_statement"] == 1 + def test_schema_0_3_basic_1(): @@ -32,7 +33,8 @@ def test_schema_0_3_basic_1(): assert results["schema_version"] == "0.3" - assert (results["statistics"]["count_ownership_or_control_statement"] == 1) + assert results["statistics"]["count_ownership_or_control_statement"] == 1 + def test_schema_0_4_basic_1(): @@ -47,4 +49,4 @@ def test_schema_0_4_basic_1(): assert results["schema_version"] == "0.4" - assert (results["statistics"]["count_ownership_or_control_statement"] == 1) + assert results["statistics"]["count_ownership_or_control_statement"] == 1 diff --git a/tests/test_stat_count_ownership_or_control_statement_interest_statement_types.py b/tests/test_stat_count_ownership_or_control_statement_interest_statement_types.py index 9bd5e87..dfdffa2 100644 --- a/tests/test_stat_count_ownership_or_control_statement_interest_statement_types.py +++ b/tests/test_stat_count_ownership_or_control_statement_interest_statement_types.py @@ -208,4 +208,3 @@ def test_schema_0_2(): "count_ownership_or_control_statement_interest_statement_types" ]["conditional-rights-granted-by-contract"] ) # noqa - diff --git a/tests/test_stat_count_person_statements.py b/tests/test_stat_count_person_statements.py index a3fa70b..6b63a5c 100644 --- a/tests/test_stat_count_person_statements.py +++ b/tests/test_stat_count_person_statements.py @@ -17,8 +17,9 @@ def test_schema_0_2(): assert results["schema_version"] == "0.2" - assert (results["statistics"]["count_person_statements"] == 1) - assert (results["statistics"]["count_person_statements_types"]['knownPerson'] == 1) + assert results["statistics"]["count_person_statements"] == 1 + assert results["statistics"]["count_person_statements_types"]["knownPerson"] == 1 + def test_schema_0_3_basic_1(): @@ -33,8 +34,9 @@ def test_schema_0_3_basic_1(): assert results["schema_version"] == "0.3" - assert (results["statistics"]["count_person_statements"] == 1) - assert (results["statistics"]["count_person_statements_types"]['knownPerson'] == 1) + assert results["statistics"]["count_person_statements"] == 1 + assert results["statistics"]["count_person_statements_types"]["knownPerson"] == 1 + def test_schema_0_4_basic_1(): @@ -49,5 +51,5 @@ def test_schema_0_4_basic_1(): assert results["schema_version"] == "0.4" - assert (results["statistics"]["count_person_statements"] == 1) - assert (results["statistics"]["count_person_statements_types"]['knownPerson'] == 1) + assert results["statistics"]["count_person_statements"] == 1 + assert results["statistics"]["count_person_statements_types"]["knownPerson"] == 1