diff --git a/CHANGELOG.md b/CHANGELOG.md index f9edd2e..d91ce47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## [unreleased] - +## [0.9.1] - 2024-08-20 + +### Changed +- events use same function as the manual submit of cogs +- ask for 5000 items from extractions and area (instead of 10) +- ask for validated map_area ## [0.9.0] - 2024-08-19 diff --git a/cdrhook/retrieve.py b/cdrhook/retrieve.py index 649e0f2..b3964ca 100644 --- a/cdrhook/retrieve.py +++ b/cdrhook/retrieve.py @@ -77,14 +77,14 @@ def validate_cog_system_versions_response(response:List[List[str]]) -> CogSystem system_versions.append(SystemId(name=item[0], version=item[1])) return CogSystemVersionsSchema(system_versions=system_versions) -def retrieve_cog_area_extraction(connection:CdrConnector, cog_id:str, system_id:SystemId=None) -> List[dict]: +def retrieve_cog_area_extraction(connection:CdrConnector, cog_id:str, system_id:SystemId=None, validated:str="false", items:int=5000) -> List[dict]: """ Retreive area extraction data for a given cog from the cdr. Note that while the code for filtering by system_id is ready on our side, the cdr does not yet provide support for this and will just ignore the addtional query info. """ - endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/area_extractions" + endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/area_extractions?validated={validated.lower()}&size={items}" if system_id is not None: - endpoint_url += f"?system_version={system_id.name}__{system_id.version}" + endpoint_url += f"&system_version={system_id.name}__{system_id.version}" return retrieve_endpoint(connection, endpoint_url) def validate_cog_area_extraction_response(response:List[dict]) -> List[AreaExtractionResponse]: @@ -96,9 +96,9 @@ def validate_cog_area_extraction_response(response:List[dict]) -> List[AreaExtra area_extractions.append(AreaExtractionResponse.model_validate(item)) return area_extractions -def retrieve_cog_legend_items(connection:CdrConnector, cog_id:str, system_id:SystemId=None, validated:str="false") -> List[dict]: +def retrieve_cog_legend_items(connection:CdrConnector, cog_id:str, system_id:SystemId=None, validated:str="false", items:int=5000) -> List[dict]: # Get all legend items for a cog - endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/legend_items?validated={validated.lower()}" + endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/legend_items?validated={validated.lower()}&size={items}" if system_id is not None: endpoint_url += f"&system_version={system_id.name}__{system_id.version}" return retrieve_endpoint(connection, endpoint_url) diff --git a/cdrhook/server.py b/cdrhook/server.py index c15d422..df283fd 100644 --- a/cdrhook/server.py +++ b/cdrhook/server.py @@ -85,10 +85,11 @@ def send_message(message, queue): # ---------------------------------------------------------------------- # region Process maps # ---------------------------------------------------------------------- -def check_uncharted_event(event_id): +def process_event(event_id): """ - If the event is an unchared event, we will download the data, and fire - the download event. + Get the data from the event and submit to process_cog setting the + legend and area parameters to be the system and system_version. This + might only be true for the uncharted area event. """ # get the event information headers = {'Authorization': f'Bearer {config["cdr_token"]}'} @@ -96,64 +97,19 @@ def check_uncharted_event(event_id): r.raise_for_status() data = r.json() - # parse the infomation - cog_id = None - map_area = None - polygon_legend_area = None - line_point_legend_area = None - cog_area = None + # get the cog for extraction in data: cog_id = extraction["cog_id"] - for area in extraction["cog_area_extractions"]: - if area["category"] == "map_area": - map_area = area - elif area["category"] == "polygon_legend_area": - polygon_legend_area = area - elif area["category"] == "line_point_legend_area": - line_point_legend_area = area - if map_area: - cog_area = extraction - if not cog_id or not map_area: - logging.error("Could not find cog_id or map_area in uncharted event") - return + system_name = extraction["system"] + system_version = extraction["system_version"] + parameters = { + "validated": ["true"], + "legend": [ f"{system_name}__{system_version}" ], + "area": [ f"{system_name}__{system_version}" ] + } + process_cog(config["cdr_connector"], cog_id, parameters=parameters) - # write the cog_area to disk - folder = os.path.join(cog_id[0:2], cog_id[2:4]) - filepart = os.path.join(folder, cog_id) - filename = os.path.join("/data", f"{filepart}.cog_area.json") - os.makedirs(os.path.dirname(filename) , exist_ok=True) - with open(filename, "w") as outputfile: - json.dump(cog_area, outputfile) - - # get the basic information - r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers) - r.raise_for_status() - cog_info = r.json() - - # send the download event - firemodels = [ ] - for k, v in config["models"].items(): - goodmodel = True - if "map_area" in v and not map_area: - logging.debug("Skipping %s because of map_area", k) - goodmodel = False - if "polygon_legend_area" in v and not polygon_legend_area: - logging.debug("Skipping %s because of polygon_legend_area", k) - goodmodel = False - if "line_point_legend_area" in v and not line_point_legend_area: - logging.debug("Skipping %s because of line_point_legend_area", k) - goodmodel = False - if goodmodel: - firemodels.append(k) - - message = { - "cog_id": cog_id, - "cog_url": cog_info["cog_url"], - "map_area": f'{config["callback_url"]}/download/{filepart}.cog_area.json', - "models": firemodels - } - logging.info("Firing download event for %s '%s'", cog_id, json.dumps(message)) - send_message(message, f'{config["prefix"]}download') + return def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optional[dict]=None, parameters : Optional[dict]=None): """ @@ -165,7 +121,7 @@ def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optio cdr_connector : CdrConnector, CDR Connection with a registered connection cog_id : str, The cog_id to process config_parm : dict, Optional field to overwrite the global config parameters, needed for unit testing - + parameters : dict, Optional field to overwrite the parameters, contains the legend, area and models to fire """ if parameters is None: parameters = {} @@ -173,6 +129,7 @@ def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optio config_parm = config valid_area_systems = config_parm["systems"]["area"] valid_legend_systems = parameters.get("legend", config_parm["systems"]["legend"]) + validated = parameters.get("validated", ["true"])[0] logging.info(f"Cog:{cog_id[0:8]} - Processing cog {cog_id}") @@ -183,7 +140,6 @@ def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optio # checking for legends, logic will be: # 1. check if cog has validated legends, if it does, use them - validated = parameters.get("validated", ["true"])[0] cog_legend_items = None if strtobool(validated): legend_response = retrieve.retrieve_cog_legend_items(cdr_connector, cog_id, validated=validated) @@ -213,21 +169,27 @@ def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optio logging.debug(f"Cog-{cog_id[0:8]} - No legend items found") # checking for area, logic will be: + # 1. check if cog has validated legends, if it does, use them cog_area_extraction = None - for area in parameters.get("area", config_parm["systems"]["area"]) or []: - logging.debug(f"Cog-{cog_id[0:8]} - Trying area {area}") - if "__" in area: - system, version = area.split("__") - systemid = SystemId(name=system, version=version) - else: - systemid = next((x for x in cog_system_versions.system_versions if x.name == area), None) - if not systemid: - continue - # 2.2 fetch the area items for the system - area_response = retrieve.retrieve_cog_area_extraction(cdr_connector, cog_id, system_id=systemid) + if strtobool(validated): + area_response = retrieve.retrieve_cog_area_extraction(cdr_connector, cog_id, validated=validated) cog_area_extraction = retrieve.validate_cog_area_extraction_response(area_response) - if cog_area_extraction: - break + # 2. if no validated legends, check if there are any legends from the list + if not cog_area_extraction: + for area in parameters.get("area", config_parm["systems"]["area"]) or []: + logging.debug(f"Cog-{cog_id[0:8]} - Trying area {area}") + if "__" in area: + system, version = area.split("__") + systemid = SystemId(name=system, version=version) + else: + systemid = next((x for x in cog_system_versions.system_versions if x.name == area), None) + if not systemid: + continue + # 2.2 fetch the area items for the system + area_response = retrieve.retrieve_cog_area_extraction(cdr_connector, cog_id, system_id=systemid) + cog_area_extraction = retrieve.validate_cog_area_extraction_response(area_response) + if cog_area_extraction: + break if cog_area_extraction is not None: logging.debug(f"Cog-{cog_id[0:8]} - Found {len(cog_area_extraction)} area items") else: @@ -394,7 +356,7 @@ def cdrhook_callback(channel, method, properties, body): elif data.get("event") == "feature.process": event_id = data.get("payload", {}).get("id", "").strip() if event_id.startswith("uncharted-area_0."): - check_uncharted_event(event_id) + process_event(event_id) else: logging.debug(f"Ignoring feature.process with id {event_id}") else: