From ba33da3a790ce76422d6226e1a286fb2fe1f984c Mon Sep 17 00:00:00 2001 From: Rob Kooper Date: Fri, 16 Aug 2024 21:04:28 -0500 Subject: [PATCH] use validated, parameters for cog endpoint --- CHANGELOG.md | 6 ++ cdrhook/retrieve.py | 6 +- cdrhook/server.py | 189 +++++++++++++++++++++++++------------------- 3 files changed, 116 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9172de2..a74632b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - Added downloader +- Use validated legends by default +- Can add parameters to cog function + - `validated` : when true will return all validated legends, if no legends found, or false it will search for legends generated by systems. + - `legend` : will search for legends created by this system + - `area` : will search for areas created by this system + - `model` : will always fire model (even if no area/legend is found) ## [0.8.0] - 2024-08-06 diff --git a/cdrhook/retrieve.py b/cdrhook/retrieve.py index 1d373ef..649e0f2 100644 --- a/cdrhook/retrieve.py +++ b/cdrhook/retrieve.py @@ -96,11 +96,11 @@ def validate_cog_area_extraction_response(response:List[dict]) -> List[AreaExtra area_extractions.append(AreaExtractionResponse.model_validate(item)) return area_extractions -def retrieve_cog_legend_items(connection:CdrConnector, cog_id:str, system_id:SystemId=None) -> List[dict]: +def retrieve_cog_legend_items(connection:CdrConnector, cog_id:str, system_id:SystemId=None, validated:str="false") -> List[dict]: # Get all legend items for a cog - endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/legend_items" + endpoint_url = f"{connection.cdr_url}/v1/features/{cog_id}/legend_items?validated={validated.lower()}" if system_id is not None: - endpoint_url += f"?system_version={system_id.name}__{system_id.version}" + endpoint_url += f"&system_version={system_id.name}__{system_id.version}" return retrieve_endpoint(connection, endpoint_url) def validate_cog_legend_items_response(response:List[dict]) -> List[LegendItemResponse]: diff --git a/cdrhook/server.py b/cdrhook/server.py index 5d5f4bf..45541d7 100644 --- a/cdrhook/server.py +++ b/cdrhook/server.py @@ -32,7 +32,7 @@ # ---------------------------------------------------------------------- # region HELPER # ---------------------------------------------------------------------- -def strtobool (val): +def strtobool(val): """Convert a string representation of truth to true (1) or false (0). True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if @@ -154,7 +154,7 @@ def check_uncharted_event(event_id): logging.info("Firing download event for %s '%s'", cog_id, json.dumps(message)) send_message(message, f'{config["prefix"]}download') -def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optional[dict]=None): +def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optional[dict]=None, parameters : Optional[dict]=None): """ Processing callback for cogs. Checks if there is enough information available to process the cog with the requested models. If there is downloads the @@ -166,92 +166,115 @@ def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optio config_parm : dict, Optional field to overwrite the global config parameters, needed for unit testing """ + if parameters is None: + parameters = {} if config_parm is None: config_parm = config valid_area_systems = config_parm["systems"]["area"] - valid_legend_systems = config_parm["systems"]["legend"] + valid_legend_systems = parameters.get("legend", config_parm["systems"]["legend"]) logging.info(f"Cog:{cog_id[0:8]} - Processing cog {cog_id}") - # region - ####### DISABLED TILL CDR IS FIXED ####### # Retrieve available system versions for this cog and check if there are any valid systems posted - # sys_ver_response = retrieve.retrieve_cog_system_versions(cdr_connector, cog_id) - # cog_system_versions = retrieve.validate_cog_system_versions_response(sys_ver_response) - # valid_systems = False - # for system in cog_system_versions.system_versions: - # if system.name in valid_area_systems or system.name in valid_legend_systems: - # valid_systems = True - # break - - # if not valid_systems: - # # logging.error(f"{cog_id[0:8]} - No valid system data found on CDR") - # raise ValueError(f"No valid system data found on CDR for {cog_id}, only saw {cog_system_versions.pretty_str()}") - # # return - # else: - # logging.debug(f"Cog-{cog_id[0:8]} - Available system versions : {cog_system_versions.pretty_str()}") - ####### endregion ####### - - # Retrieve cdr data for this cog - with ThreadPoolExecutor() as p: - area_future = p.submit(retrieve.retrieve_cog_area_extraction, cdr_connector, cog_id) - legend_future = p.submit(retrieve.retrieve_cog_legend_items, cdr_connector, cog_id) - - area_response = area_future.result() - legend_response = legend_future.result() - - # Validate responses to cdr_schema objects - cog_area_extraction = retrieve.validate_cog_area_extraction_response(area_response) - cog_legend_items = retrieve.validate_cog_legend_items_response(legend_response) - - # Filter out systems that are not valid - cog_area_extraction = [ae for ae in cog_area_extraction if ae.system in valid_area_systems] - cog_legend_items = [li for li in cog_legend_items if li.system in valid_legend_systems] - - # Check there is enough data to process - ae_categories = {AreaType.Map_Area: 0, AreaType.Polygon_Legend_Area: 0, AreaType.Line_Point_Legend_Area: 0} - for ae in cog_area_extraction: - if ae.category not in ae_categories: - ae_categories[ae.category] = 1 + sys_ver_response = retrieve.retrieve_cog_system_versions(cdr_connector, cog_id) + cog_system_versions = retrieve.validate_cog_system_versions_response(sys_ver_response) + logging.debug(f"Cog-{cog_id[0:8]} - Available system versions : {cog_system_versions.pretty_str()}") + + # checking for legends, logic will be: + # 1. check if cog has validated legends, if it does, use them + validated = parameters.get("validated", ["true"])[0] + cog_legend_items = None + if strtobool(validated): + legend_response = retrieve.retrieve_cog_legend_items(cdr_connector, cog_id, validated=validated) + cog_legend_items = retrieve.validate_cog_legend_items_response(legend_response) + # 2. if no validated legends, check if there are any legends from the list + if not cog_legend_items: + logging.debug(f"Cog-{cog_id[0:8]} - No validated legend items found, trying unvalidated") + # 2.1 loop through the list of legends and check if they are available + for legend in parameters.get("legend", config_parm["systems"]["legend"]) or []: + logging.debug(f"Cog-{cog_id[0:8]} - Trying legend {legend}") + if "__" in legend: + system, version = legend.split("__") + systemid = SystemId(name=system, version=version) + else: + systemid = next((x for x in cog_system_versions.system_versions if x.name == legend), None) + if not systemid: + continue + # 2.2 fetch the legend items for the system + legend_response = retrieve.retrieve_cog_legend_items(cdr_connector, cog_id, system_id=systemid, validated="false") + cog_legend_items = retrieve.validate_cog_legend_items_response(legend_response) + if cog_legend_items: + break + logging.debug(f"Cog-{cog_id[0:8]} - Found {len(cog_legend_items)} legend items") + + # checking for area, logic will be: + cog_area_extraction = None + for area in parameters.get("area", config_parm["systems"]["area"]) or []: + logging.debug(f"Cog-{cog_id[0:8]} - Trying area {area}") + if "__" in area: + system, version = area.split("__") + systemid = SystemId(name=system, version=version) else: - ae_categories[ae.category] += 1 - logging.debug(f"Cog-{cog_id[0:8]} - Found {ae_categories}") - poly_map_units = [mu for mu in cog_legend_items if mu.category == 'polygon'] - logging.debug(f"Cog-{cog_id[0:8]} - Found {len(poly_map_units)} polygon map units") - - valid_map_area, valid_polygon_legend_area, valid_polygon_map_units = True, True, True - if ae_categories[AreaType.Map_Area] < 1: - logging.debug(f"Cog-{cog_id[0:8]} - No map area found") - valid_map_area = False - if ae_categories[AreaType.Line_Point_Legend_Area] < 1: - logging.debug(f"Cog-{cog_id[0:8]} - No line point legend area found") - valid_line_point_legend_area = False - if ae_categories[AreaType.Polygon_Legend_Area] < 1: - logging.debug(f"Cog-{cog_id[0:8]} - No polygon legend area found") - valid_polygon_legend_area = False - # if len(poly_map_units) < 1: - # logging.debug(f"Cog-{cog_id[0:8]} - No polygon legend items found") - # valid_polygon_map_units = False - - firemodels = [ ] - for model, prereqs in config_parm["models"].items(): - goodmodel = True - if "map_area" in prereqs and not valid_map_area: - logging.debug("Skipping %s because of map_area", model) - goodmodel = False - if "polygon_legend_area" in prereqs and not valid_polygon_legend_area: - logging.debug("Skipping %s because of polygon_legend_area", model) - goodmodel = False - if "line_point_legend_area" in prereqs and not valid_line_point_legend_area: - logging.debug("Skipping %s because of line_point_legend_area", model) - goodmodel = False - if "polygon_map_units" in prereqs and not valid_polygon_map_units: - logging.debug("Skipping %s because of polygon_map_units", model) - goodmodel = False - if goodmodel: - logging.info(f"{cog_id[0:8]} - Firing download event for {model}") - firemodels.append(model) - + systemid = next((x for x in cog_system_versions.system_versions if x.name == area), None) + if not systemid: + continue + # 2.2 fetch the area items for the system + area_response = retrieve.retrieve_cog_area_extraction(cdr_connector, cog_id, system_id=systemid) + cog_area_extraction = retrieve.validate_cog_area_extraction_response(area_response) + if cog_area_extraction: + break + logging.debug(f"Cog-{cog_id[0:8]} - Found {len(cog_area_extraction)} area items") + + # check what models to fire + firemodels = parameters.get("model", [ ]) + # 1. if models are specified in the parameters, use them + # 2. if not, check if there is enough data to process + if len(firemodels) == 0: + # Check there is enough data to process + ae_categories = {AreaType.Map_Area: 0, AreaType.Polygon_Legend_Area: 0, AreaType.Line_Point_Legend_Area: 0} + for ae in cog_area_extraction: + if ae.category not in ae_categories: + ae_categories[ae.category] = 1 + else: + ae_categories[ae.category] += 1 + logging.debug(f"Cog-{cog_id[0:8]} - Found {ae_categories}") + poly_map_units = [mu for mu in cog_legend_items if mu.category == 'polygon'] + logging.debug(f"Cog-{cog_id[0:8]} - Found {len(poly_map_units)} polygon map units") + + valid_map_area, valid_polygon_legend_area, valid_polygon_map_units = True, True, True + if ae_categories[AreaType.Map_Area] < 1: + logging.debug(f"Cog-{cog_id[0:8]} - No map area found") + valid_map_area = False + if ae_categories[AreaType.Line_Point_Legend_Area] < 1: + logging.debug(f"Cog-{cog_id[0:8]} - No line point legend area found") + valid_line_point_legend_area = False + if ae_categories[AreaType.Polygon_Legend_Area] < 1: + logging.debug(f"Cog-{cog_id[0:8]} - No polygon legend area found") + valid_polygon_legend_area = False + # if len(poly_map_units) < 1: + # logging.debug(f"Cog-{cog_id[0:8]} - No polygon legend items found") + # valid_polygon_map_units = False + + # Check what models to fire, unless they are already specified + for model, prereqs in config_parm["models"].items(): + goodmodel = True + if "map_area" in prereqs and not valid_map_area: + logging.debug("Skipping %s because of map_area", model) + goodmodel = False + if "polygon_legend_area" in prereqs and not valid_polygon_legend_area: + logging.debug("Skipping %s because of polygon_legend_area", model) + goodmodel = False + if "line_point_legend_area" in prereqs and not valid_line_point_legend_area: + logging.debug("Skipping %s because of line_point_legend_area", model) + goodmodel = False + if "polygon_map_units" in prereqs and not valid_polygon_map_units: + logging.debug("Skipping %s because of polygon_map_units", model) + goodmodel = False + if goodmodel: + logging.info(f"{cog_id[0:8]} - Firing download event for {model}") + firemodels.append(model) + + # only continue if there are models to fire if len(firemodels) == 0: raise ValueError(f"Cannot process {cog_id}, no models were able to be started") @@ -324,7 +347,8 @@ def cog(id): Process the cog """ logging.info(f"Received process cog for {id}") - send_message({"event": "ncsacog", "cog_id": id}, f'{config["prefix"]}cdrhook') + parameters = request.args.to_dict(False) + send_message({"event": "ncsacog", "cog_id": id, "parameters": parameters}, f'{config["prefix"]}cdrhook') return {"ok": "success"} @@ -355,7 +379,8 @@ def cdrhook_callback(channel, method, properties, body): logging.debug("ping/pong") elif data.get("event") == "ncsacog": cog_id = data.get("cog_id", "").strip() - process_cog(config["cdr_connector"], cog_id) + parameters = data.get("parameters", {}) + process_cog(config["cdr_connector"], cog_id, parameters=parameters) elif data.get("event") == "map.process": logging.debug("ignoring map.process") elif data.get("event") == "feature.process":