diff --git a/cdrhook/cdr_endpoint_schemas.py b/cdrhook/cdr_endpoint_schemas.py index 018a534..bfe0758 100644 --- a/cdrhook/cdr_endpoint_schemas.py +++ b/cdrhook/cdr_endpoint_schemas.py @@ -7,8 +7,8 @@ class CogDownloadSchema(BaseModel): The response schema from the CDR cog download endpoint. """ cog_url: str = Field(description="The URL to download the geotif of the requested cog") - ngmdb_prod: str = Field(description="???") - ngmdb_item: int = Field(description="???") + ngmdb_prod: Optional[str] = Field(description="???") + ngmdb_item: Optional[int] = Field(description="???") # # Returned by cog_system_versions endpoint class SystemId(BaseModel): diff --git a/cdrhook/connector.py b/cdrhook/connector.py index 50ab210..459344b 100644 --- a/cdrhook/connector.py +++ b/cdrhook/connector.py @@ -65,6 +65,36 @@ def unregister(self): r.raise_for_status() self.registration = None + def __str__(self) -> str: + repr = "CdrConnector(" + repr += f"system_name='{self.system_name}', " + repr += f"system_version='{self.system_version}', " + repr += f"token='{self.token[:8]}...', " + repr += f"callback_url='{self.callback_url}', " + repr += f"callback_secret='{self.callback_secret[:8]}...', " + repr += f"callback_username='{self.callback_username}', " + repr += "callback_password='...', " + repr += f"events={self.events}, " + repr += f"cdr_url='{self.cdr_url}', " + repr += f"registration={self.registration[:8]}..." + repr += ")" + return repr + + def __repr__(self) -> str: + repr = "CdrConnector(" + repr += f"system_name='{self.system_name}', " + repr += f"system_version='{self.system_version}', " + repr += f"token='{self.token[:8]}...', " + repr += f"callback_url='{self.callback_url}', " + repr += f"callback_secret='{self.callback_secret[:8]}...', " + repr += f"callback_username='{self.callback_username}', " + repr += "callback_password='...', " + repr += f"events={self.events}, " + repr += f"cdr_url='{self.cdr_url}', " + repr += f"registration={self.registration[:8]}..." + repr += ")" + return repr + def __del__(self): if self.registration is not None: self.unregister() \ No newline at end of file diff --git a/cdrhook/server.py b/cdrhook/server.py index 5196200..d49c1bb 100644 --- a/cdrhook/server.py +++ b/cdrhook/server.py @@ -151,13 +151,22 @@ def check_uncharted_event(event_id): from concurrent.futures import ThreadPoolExecutor from cmaas_utils.types import CMAAS_Map from cdr_schemas.cdr_responses.area_extractions import AreaType -def process_cog(cdr_connector, cog_id): +from cdrhook.connector import CdrConnector +from typing import Optional +def process_cog(cdr_connector : CdrConnector , cog_id : str, config_parm : Optional[dict]=None): """ Processing callback for cogs. Checks if there is enough information available to process the cog with the requested models. If there is downloads the prereq data from the CDR, saves it to a temporary file and fires the download event to rabbitmq. + + cdr_connector : CdrConnector, CDR Connection with a registered connection + cog_id : str, The cog_id to process + config_parm : dict, Optional field to overwrite the global config parameters, needed for unit testing + """ + if config_parm is None: + config_parm = config valid_area_systems = ['uncharted'] valid_legend_systems = ['polymer', 'uncharted'] @@ -172,7 +181,8 @@ def process_cog(cdr_connector, cog_id): break if not valid_systems: - logging.error(f"{cog_id[0:8]} - No valid system data found on CDR") + # logging.error(f"{cog_id[0:8]} - No valid system data found on CDR") + raise ValueError(f"No valid system data found on CDR for {cog_id}") # return else: logging.info(f"{cog_id[0:8]} - Available system versions : {cog_system_versions.pretty_str()}") @@ -223,7 +233,7 @@ def process_cog(cdr_connector, cog_id): # return firemodels = [ ] - for model, prereqs in config["models"].items(): + for model, prereqs in config_parm["models"].items(): goodmodel = True if "map_area" in prereqs and not valid_map_area: logging.debug("Skipping %s because of map_area", model) @@ -242,8 +252,8 @@ def process_cog(cdr_connector, cog_id): firemodels.append(model) if len(firemodels) == 0: - return - + raise ValueError(f"No valid models found for {cog_id}") + # Retrieve download link for the geotiff cog_download_response = retrieve.retrieve_cog_download(cdr_connector, cog_id) cog_download = retrieve.validate_cog_download_response(cog_download_response) @@ -256,18 +266,24 @@ def process_cog(cdr_connector, cog_id): # write the cog_area to disk folder = os.path.join(cog_id[0:2], cog_id[2:4]) filepart = os.path.join(folder, cog_id) - filename = os.path.join("/data", f"{filepart}.cog_area.json") - # filename = os.path.join("/data", f"{filepart}.map_data.json") # Change name to map_data.json - saveCMASSMap(filename, map_data) + filename = os.path.join("/data", f"{filepart}.map_data.json") + if 'mode' in config_parm and config_parm['mode'] == 'test': # Can't write to /data in tests + filename = os.path.join('tests', 'data', f'{filepart}.map_data.json') + os.makedirs(os.path.dirname(filename) , exist_ok=True) + + with open(filename, "w") as fh: + fh.write(map_data.model_dump_json()) + # saveCMASSMap(filename, map_data) message = { "cog_id": cog_id, "cog_url": cog_download.cog_url, - "map_area": f'{config["callback_url"]}/download/{filepart}.cog_area.json', + "map_data": f'{cdr_connector.callback_url}/download/{filepart}.map_data.json', "models": firemodels } logging.info("Firing download event for %s '%s'", cog_id, json.dumps(message)) - # send_message(message, f'{config["prefix"]}download') + if 'mode' in config_parm and config_parm['mode'] != 'test': # Can't send rabbitmq in tests + send_message(message, f'{config_parm["prefix"]}download') def _process_cog(cog_id): @@ -544,7 +560,7 @@ def cdrhook_listener(config): # logging.info("Unregistered with CDR") # r.raise_for_status() -from cdrhook.connector import CdrConnector + def create_app(): """ Create the Flask app, setting up the environment variables and @@ -580,6 +596,7 @@ def create_app(): callback_password=os.getenv("CALLBACK_PASSWORD"), ) cdr_connector.register() + config["cdr_connector"] = cdr_connector # registration = register_system(config) # config["registration"] = registration @@ -617,7 +634,6 @@ def create_app(): def handle_sig(sig, frame): logging.warning(f"Got signal {sig}, now close worker...") cdr_connector.unregister() - # unregister_system(config['cdr_token'], config['registration']) sys.exit(0) for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGHUP): diff --git a/tests/data/sample_cog_results.json b/tests/data/sample_cog_results.json new file mode 100644 index 0000000..35153ca --- /dev/null +++ b/tests/data/sample_cog_results.json @@ -0,0 +1,154 @@ +{ + "cog_id": "78c274e9575d1ac948d55a55265546d711551cdd5cdd53592c9928d502d50700", + "georef_results": [ + { + "cog_id": "78c274e9575d1ac948d55a55265546d711551cdd5cdd53592c9928d502d50700", + "georeference_results": [], + "gcps": [ + { + "gcp_id": "26938b0d-ba58-4711-b79c-a990d24bbea1", + "map_geom": { + "latitude": 48.875, + "longitude": -96.5, + "type": "Point" + }, + "px_geom": { + "rows_from_top": 307.0, + "columns_from_left": 491.0, + "type": "Point" + }, + "confidence": null, + "model": "umn", + "model_version": "0.0.1", + "crs": "EPSG:4326" + }, + { + "gcp_id": "39fba3a8-1514-4f38-8e9c-af634f6a365d", + "map_geom": { + "latitude": 48.875, + "longitude": -96.375, + "type": "Point" + }, + "px_geom": { + "rows_from_top": 7252.0, + "columns_from_left": 491.0, + "type": "Point" + }, + "confidence": null, + "model": "umn", + "model_version": "0.0.1", + "crs": "EPSG:4326" + }, + { + "gcp_id": "dba6212c-d751-4c55-9ae6-56b19199bd51", + "map_geom": { + "latitude": 48.75, + "longitude": -96.375, + "type": "Point" + }, + "px_geom": { + "rows_from_top": 7252.0, + "columns_from_left": 5592.0, + "type": "Point" + }, + "confidence": null, + "model": "umn", + "model_version": "0.0.1", + "crs": "EPSG:4326" + }, + { + "gcp_id": "9eaa35da-d8b1-4f44-9096-dca16af8d916", + "map_geom": { + "latitude": 48.75, + "longitude": -96.5, + "type": "Point" + }, + "px_geom": { + "rows_from_top": 307.0, + "columns_from_left": 5592.0, + "type": "Point" + }, + "confidence": null, + "model": "umn", + "model_version": "0.0.1", + "crs": "EPSG:4326" + } + ], + "system": "umn-usc-inferlink", + "system_version": "0.0.1" + } + ], + "extraction_results": [ + { + "system": "umn-usc-inferlink", + "system_version": "0.0.1", + "cog_id": "78c274e9575d1ac948d55a55265546d711551cdd5cdd53592c9928d502d50700", + "line_feature_results": [ + { + "id": "0", + "legend_provenance": null, + "name": "", + "abbreviation": "", + "description": "Dashed where inferred. Arrows show relative horizontal movement; ball and bar on downthrown side", + "legend_bbox": [], + "legend_contour": [], + "reference_id": "", + "validated": false, + "crs": "CRITICALMAAS:pixel", + "cdr_projection_id": "", + "line_features": { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "1", + "geometry": { + "coordinates": [ + [ + 2824.0, + 3645.0 + ], + [ + 2812.0, + 3653.0 + ], + [ + 2794.0, + 3663.0 + ], + [ + 2773.0, + 3671.0 + ], + [ + 2747.0, + 3683.0 + ], + [ + 2725.0, + 3696.0 + ] + ], + "type": "LineString" + }, + "properties": { + "model": "umn-usc-inferlink", + "model_version": "0.0.1", + "confidence": 0.8, + "dash_pattern": "solid", + "symbol": "", + "reference_id": "", + "validated": false + } + } + ] + } + } + ], + "point_feature_results": [], + "polygon_feature_results": [], + "cog_area_extractions": [], + "cog_metadata_extractions": [] + } + ] +} \ No newline at end of file diff --git a/tests/test_cdrhook/test_server.py b/tests/test_cdrhook/test_server.py index fce7fd4..f861955 100644 --- a/tests/test_cdrhook/test_server.py +++ b/tests/test_cdrhook/test_server.py @@ -1,4 +1,5 @@ import os +import json from dotenv import load_dotenv from cdrhook.server import process_cog @@ -27,5 +28,11 @@ def teardown_class(self): def test_process_cog(self): log = init_test_log("TestCallbacks/test_process_cog") - process_cog(self.con, self.cog_id) + config = {} + config["mode"] = 'test' + config["prefix"] = os.getenv("PREFIX") + with open("cdrhook/models.json", "r") as f: + config["models"] = json.load(f) + + process_cog(self.con, self.cog_id, config_parm=config) log.info("Test passed successfully")