From 9851c67c10312702c899337260881ab44ae5e571 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 14 Aug 2024 11:49:34 +0200 Subject: [PATCH 01/11] Re-pushed changes from --- src/openeo_gfmap/fetching/generic.py | 10 + src/openeo_gfmap/fetching/s1.py | 4 +- src/openeo_gfmap/manager/job_manager.py | 296 ++++++++++++++++---- src/openeo_gfmap/manager/job_splitters.py | 12 +- src/openeo_gfmap/utils/catalogue.py | 22 +- tests/test_openeo_gfmap/test_s1_fetchers.py | 4 +- 6 files changed, 278 insertions(+), 70 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index 21dc209..197004a 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -116,6 +116,10 @@ def generic_default_processor(cube: openeo.DataCube, **params): "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), }, + Backend.CDSE_STAGING: { + "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), + "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), + }, Backend.FED: { "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), @@ -134,6 +138,12 @@ def generic_default_processor(cube: openeo.DataCube, **params): _get_generic_processor, collection_name="COPERNICUS_30" ), }, + Backend.CDSE_STAGING: { + "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), + "preprocessor": partial( + _get_generic_processor, collection_name="COPERNICUS_30" + ), + }, Backend.FED: { "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), "preprocessor": partial( diff --git a/src/openeo_gfmap/fetching/s1.py b/src/openeo_gfmap/fetching/s1.py index 6081d40..97d6fc2 100644 --- a/src/openeo_gfmap/fetching/s1.py +++ b/src/openeo_gfmap/fetching/s1.py @@ -67,8 +67,6 @@ def s1_grd_fetch_default( """ bands = convert_band_names(bands, BASE_SENTINEL1_GRD_MAPPING) - load_collection_parameters = params.get("load_collection", {}) - cube = _load_collection( connection, bands, @@ -76,7 +74,7 @@ def s1_grd_fetch_default( spatial_extent, temporal_extent, fetch_type, - **load_collection_parameters, + **params, ) if fetch_type is not FetchType.POINT and isinstance(spatial_extent, GeoJSON): diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 9a9f7b2..3e73b90 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -1,13 +1,18 @@ import json +import pickle import threading +import time from concurrent.futures import ThreadPoolExecutor +from datetime import datetime from enum import Enum from functools import partial from pathlib import Path -from typing import Callable, Optional, Union +from threading import Lock +from typing import Callable, NamedTuple, Optional, Union import pandas as pd import pystac +from openeo import Connection from openeo.extra.job_management import MultiBackendJobManager from openeo.rest.job import BatchJob from pystac import CatalogType @@ -16,21 +21,53 @@ from openeo_gfmap.stac import constants # Lock to use when writing to the STAC collection -_stac_lock = threading.Lock() +_stac_lock = Lock() + + +def retry_on_exception(max_retries, delay=30): + def decorator(func): + def wrapper(*args, **kwargs): + latest_exception = None + for _ in range(max_retries): + try: + return func(*args, **kwargs) + except Exception as e: + time.sleep(delay) + latest_exception = e + raise latest_exception + + return wrapper + + return decorator def done_callback(future, df, idx): - """Sets the status of the job to the given status when the future is done.""" + """Sets the status of the job to the given status when the future is done. + + If an exception occurred, then tries to retry the post-job action once if + it wasn't done already. If this is the second time the post-job action + fails, then the job is set to error. + """ current_status = df.loc[idx, "status"] - if not future.exception(): + exception = future.exception() + if exception is None: if current_status == "postprocessing": df.loc[idx, "status"] = "finished" elif current_status == "postprocessing-error": df.loc[idx, "status"] = "error" + elif current_status == "running": + df.loc[idx, "status"] = "running" else: raise ValueError( f"Invalid status {current_status} for job {df.loc[idx, 'id']} for done_callback!" ) + else: + _log.exception( + "Exception occurred in post-job future for job %s:\n%s", + df.loc[idx, "id"], + exception, + ) + df.loc[idx, "status"] = "error" class PostJobStatus(Enum): @@ -53,11 +90,14 @@ def __init__( post_job_action: Optional[Callable] = None, poll_sleep: int = 5, n_threads: int = 1, - post_job_params: dict = {}, resume_postproc: bool = True, # If we need to check for post-job actions that crashed restart_failed: bool = False, # If we need to restart failed jobs + dynamic_max_jobs: bool = True, # If we need to dynamically change the maximum number of parallel jobs + max_jobs_worktime: bool = 10, # Maximum number of jobs to run in a given time + max_jobs: int = 20, # Maximum number of jobs to run at the same time ): self._output_dir = output_dir + self._catalogue_cache = output_dir / "catalogue_cache.bin" self.stac = stac self.collection_id = collection_id @@ -74,7 +114,6 @@ def __init__( self._output_path_gen = output_path_generator self._post_job_action = post_job_action - self._post_job_params = post_job_params # Monkey patching the _normalize_df method to ensure we have no modification on the # geometry column @@ -83,16 +122,77 @@ def __init__( self._root_collection = self._normalize_stac() + # Add a property that calculates the number of maximum concurrent jobs + # dinamically depending on the time + self._dynamic_max_jobs = dynamic_max_jobs + self._max_jobs_worktime = max_jobs_worktime + self._max_jobs = max_jobs + + def add_backend( + self, + name: str, + connection, + parallel_jobs: Optional[int] = 2, + dynamic_max_jobs: bool = False, + min_jobs: Optional[int] = None, + max_jobs: Optional[int] = None, + ): + if not dynamic_max_jobs: + if parallel_jobs is None: + raise ValueError( + "When dynamic_max_jobs is set to False, parallel_jobs must be provided." + ) + return super().add_backend(name, connection, parallel_jobs) + + if min_jobs is None or max_jobs is None: + raise ValueError( + "When dynamic_max_jobs is set to True, min_jobs and max_jobs must be provided." + ) + + if isinstance(connection, Connection): + c = connection + connection = lambda: c # noqa: E731 + assert callable(connection) + + # Create a new NamedTuple to store the dynamic backend properties + class _DynamicBackend(NamedTuple): + get_connection: Callable[[], Connection] + + @property + def parallel_jobs(self) -> int: + current_time = datetime.now() + + # Limiting working hours + start_worktime_hour = 8 + end_worktime_hour = 20 + + if ( + current_time.hour >= start_worktime_hour + and current_time.hour < end_worktime_hour + ): + return min_jobs + return max_jobs + + self.backends[name] = _DynamicBackend(get_connection=connection) + def _normalize_stac(self): default_collection_path = self._output_dir / "stac/collection.json" - if self.stac is not None: + if self._catalogue_cache.exists(): _log.info( - f"Reloading the STAC collection from the provided path: {self.stac}." + "Loading the STAC collection from the persisted binary file: %s.", + self._catalogue_cache, + ) + with open(self._catalogue_cache, "rb") as file: + root_collection = pickle.load(file) + elif self.stac is not None: + _log.info( + "Reloading the STAC collection from the provided path: %s.", self.stac ) root_collection = pystac.read_file(str(self.stac)) elif default_collection_path.exists(): _log.info( - f"Reload the STAC collection from the default path: {default_collection_path}." + "Reload the STAC collection from the default path: %s.", + default_collection_path, ) self.stac = default_collection_path root_collection = pystac.read_file(str(self.stac)) @@ -150,16 +250,30 @@ def _resume_postjob_actions(self, df: pd.DataFrame): job = connection.job(row.id) if row.status == "postprocessing": _log.info( - f"Resuming postprocessing of job {row.id}, queueing on_job_finished..." + "Resuming postprocessing of job %s, queueing on_job_finished...", + row.id, + ) + future = self._executor.submit(self.on_job_done, job, row, _stac_lock) + future.add_done_callback( + partial( + done_callback, + df=df, + idx=idx, + ) ) - future = self._executor.submit(self.on_job_done, job, row) - future.add_done_callback(partial(done_callback, df=df, idx=idx)) else: _log.info( - f"Resuming postprocessing of job {row.id}, queueing on_job_error..." + "Resuming postprocessing of job %s, queueing on_job_error...", + row.id, ) future = self._executor.submit(self.on_job_error, job, row) - future.add_done_callback(partial(done_callback, df=df, idx=idx)) + future.add_done_callback( + partial( + done_callback, + df=df, + idx=idx, + ) + ) self._futures.append(future) def _restart_failed_jobs(self, df: pd.DataFrame): @@ -167,7 +281,9 @@ def _restart_failed_jobs(self, df: pd.DataFrame): failed_tasks = df[df.status.isin(["error", "start_failed"])] not_started_tasks = df[df.status == "not_started"] _log.info( - f"Resetting {len(failed_tasks)} failed jobs to 'not_started'. {len(not_started_tasks)} jobs are already 'not_started'." + "Resetting %s failed jobs to 'not_started'. %s jobs are already 'not_started'.", + len(failed_tasks), + len(not_started_tasks), ) for idx, _ in failed_tasks.iterrows(): df.loc[idx, "status"] = "not_started" @@ -203,38 +319,53 @@ def _update_statuses(self, df: pd.DataFrame): job_metadata["status"] == "finished" ): _log.info( - f"Job {job.job_id} finished successfully, queueing on_job_done..." + "Job %s finished successfully, queueing on_job_done...", job.job_id ) job_status = "postprocessing" - future = self._executor.submit(self.on_job_done, job, row) + future = self._executor.submit(self.on_job_done, job, row, _stac_lock) # Future will setup the status to finished when the job is done - future.add_done_callback(partial(done_callback, df=df, idx=idx)) - self._futures.append(future) - df.loc[idx, "costs"] = job_metadata["costs"] - df.loc[idx, "memory"] = ( - job_metadata["usage"] - .get("max_executor_memory", {}) - .get("value", None) - ) - df.loc[idx, "cpu"] = ( - job_metadata["usage"].get("cpu", {}).get("value", None) - ) - df.loc[idx, "duration"] = ( - job_metadata["usage"].get("duration", {}).get("value", None) + future.add_done_callback( + partial( + done_callback, + df=df, + idx=idx, + ) ) + self._futures.append(future) + if "costs" in job_metadata: + df.loc[idx, "costs"] = job_metadata["costs"] + df.loc[idx, "memory"] = ( + job_metadata["usage"] + .get("max_executor_memory", {}) + .get("value", None) + ) + + else: + _log.warning( + "Costs not found in job %s metadata. Costs will be set to 'None'.", + job.job_id, + ) # Case in which it failed if (df.loc[idx, "status"] != "error") and ( job_metadata["status"] == "error" ): _log.info( - f"Job {job.job_id} finished with error, queueing on_job_error..." + "Job %s finished with error, queueing on_job_error...", + job.job_id, ) job_status = "postprocessing-error" future = self._executor.submit(self.on_job_error, job, row) # Future will setup the status to error when the job is done - future.add_done_callback(partial(done_callback, df=df, idx=idx)) + future.add_done_callback( + partial( + done_callback, + df=df, + idx=idx, + ) + ) self._futures.append(future) + if "costs" in job_metadata: df.loc[idx, "costs"] = job_metadata["costs"] df.loc[idx, "status"] = job_status @@ -242,6 +373,7 @@ def _update_statuses(self, df: pd.DataFrame): # Clear the futures that are done and raise their potential exceptions if they occurred. self._clear_queued_actions() + @retry_on_exception(max_retries=2, delay=30) def on_job_error(self, job: BatchJob, row: pd.Series): """Method called when a job finishes with an error. @@ -252,7 +384,14 @@ def on_job_error(self, job: BatchJob, row: pd.Series): row: pd.Series The row in the dataframe that contains the job relative information. """ - logs = job.logs() + try: + logs = job.logs() + except Exception as e: # pylint: disable=broad-exception-caught + _log.exception( + "Error getting logs in `on_job_error` for job %s:\n%s", job.job_id, e + ) + logs = [] + error_logs = [log for log in logs if log.level.lower() == "error"] job_metadata = job.describe_job() @@ -271,15 +410,19 @@ def on_job_error(self, job: BatchJob, row: pd.Series): f"Couldn't find any error logs. Please check the error manually on job ID: {job.job_id}." ) - def on_job_done(self, job: BatchJob, row: pd.Series): + @retry_on_exception(max_retries=2, delay=30) + def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): """Method called when a job finishes successfully. It will first download the results of the job and then call the `post_job_action` method. """ + job_products = {} for idx, asset in enumerate(job.get_results().get_assets()): try: _log.debug( - f"Generating output path for asset {asset.name} from job {job.job_id}..." + "Generating output path for asset %s from job %s...", + asset.name, + job.job_id, ) output_path = self._output_path_gen(self._output_dir, idx, row) # Make the output path @@ -288,11 +431,17 @@ def on_job_done(self, job: BatchJob, row: pd.Series): # Add to the list of downloaded products job_products[f"{job.job_id}_{asset.name}"] = [output_path] _log.debug( - f"Downloaded {asset.name} from job {job.job_id} -> {output_path}" + "Downloaded %s from job %s -> %s", + asset.name, + job.job_id, + output_path, ) except Exception as e: _log.exception( - f"Error downloading asset {asset.name} from job {job.job_id}", e + "Error downloading asset %s from job %s:\n%s", + asset.name, + job.job_id, + e, ) raise e @@ -313,45 +462,63 @@ def on_job_done(self, job: BatchJob, row: pd.Series): asset.href = str( asset_path ) # Update the asset href to the output location set by the output_path_generator - # item.id = f"{job.job_id}_{item.id}" + # Add the item to the the current job items. job_items.append(item) - _log.info(f"Parsed item {item.id} from job {job.job_id}") + _log.info("Parsed item %s from job %s", item.id, job.job_id) except Exception as e: _log.exception( - f"Error failed to add item {item.id} from job {job.job_id} to STAC collection", + "Error failed to add item %s from job %s to STAC collection:\n%s", + item.id, + job.job_id, e, ) - raise e # _post_job_action returns an updated list of stac items. Post job action can therefore # update the stac items and access their products through the HREF. It is also the # reponsible of adding the appropriate metadata/assets to the items. if self._post_job_action is not None: - _log.debug(f"Calling post job action for job {job.job_id}...") - job_items = self._post_job_action(job_items, row, self._post_job_params) + _log.debug("Calling post job action for job %s...", job.job_id) + job_items = self._post_job_action(job_items, row) - _log.info(f"Adding {len(job_items)} items to the STAC collection...") + _log.info("Adding %s items to the STAC collection...", len(job_items)) - with _stac_lock: # Take the STAC lock to avoid concurrence issues - # Filters the job items to only keep the ones that are not already in the collection - existing_ids = [item.id for item in self._root_collection.get_all_items()] - job_items = [item for item in job_items if item.id not in existing_ids] - - self._root_collection.add_items(job_items) - _log.info(f"Added {len(job_items)} items to the STAC collection.") - - _log.info(f"Writing STAC collection for {job.job_id} to file...") + with lock: # Take the STAC lock to avoid concurrence issues try: - self._write_stac() + _log.info("Thread %s entered the STAC lock.", threading.get_ident()) + # Filters the job items to only keep the ones that are not already in the collection + existing_ids = [ + item.id for item in self._root_collection.get_all_items() + ] + job_items = [item for item in job_items if item.id not in existing_ids] + + # validated_items = [] + # # Validate the items + # for item in job_items: + # try: + # item.validate() + # validated_items.append(item) + # except Exception as e: + # _log.warning( + # "Couldn't validate item %s from job %s, ignoring:\n%s", + # item.id, + # job.job_id, + # e, + # ) + self._root_collection.add_items(job_items) + # self._root_collection.add_items(validated_items) + _log.info("Added %s items to the STAC collection.", len(job_items)) + + self._persist_stac() except Exception as e: _log.exception( - f"Error writing STAC collection for job {job.job_id} to file.", e + "Error adding items to the STAC collection for job %s:\n%s ", + job.job_id, + str(e), ) raise e - _log.info(f"Wrote STAC collection for {job.job_id} to file.") - _log.info(f"Job {job.job_id} and post job action finished successfully.") + _log.info("Job %s and post job action finished successfully.", job.job_id) def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: """Ensure we have the required columns and the expected type for the geometry column. @@ -377,7 +544,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: } df = df.assign(**new_columns) - _log.debug(f"Normalizing dataframe. Columns: {df.columns}") + _log.debug("Normalizing dataframe. Columns: %s", df.columns) return df @@ -412,7 +579,7 @@ def run_jobs( The file to track the results of the jobs. """ # Starts the thread pool to work on the on_job_done and on_job_error methods - _log.info(f"Starting ThreadPoolExecutor with {self._n_threads} workers.") + _log.info("Starting ThreadPoolExecutor with %s workers.", self._n_threads) with ThreadPoolExecutor(max_workers=self._n_threads) as executor: _log.info("Creating and running jobs.") self._executor = executor @@ -423,6 +590,11 @@ def run_jobs( self._wait_queued_actions() _log.info("Exiting ThreadPoolExecutor.") self._executor = None + _log.info( + "Finished running jobs, saving persisted STAC collection to final .json collection." + ) + self._write_stac() + _log.info("Saved STAC catalogue to JSON format, all tasks finished!") def _write_stac(self): """Writes the STAC collection to the output directory.""" @@ -439,6 +611,12 @@ def _write_stac(self): self._root_collection.normalize_hrefs(str(root_path)) self._root_collection.save(catalog_type=CatalogType.SELF_CONTAINED) + def _persist_stac(self): + """Persists the STAC collection by saving it into a binary file.""" + _log.info("Persisting STAC collection to temp file %s.", self._catalogue_cache) + with open(self._catalogue_cache, "wb") as file: + pickle.dump(self._root_collection, file) + def setup_stac( self, constellation: Optional[str] = None, diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py index 6a8381c..9a795fb 100644 --- a/src/openeo_gfmap/manager/job_splitters.py +++ b/src/openeo_gfmap/manager/job_splitters.py @@ -2,6 +2,7 @@ form of a GeoDataFrames. """ +from functools import lru_cache from pathlib import Path from typing import List @@ -12,16 +13,17 @@ from openeo_gfmap.manager import _log +@lru_cache(maxsize=1) def load_s2_grid() -> gpd.GeoDataFrame: """Returns a geo data frame from the S2 grid.""" # Builds the path where the geodataframe should be - gdf_path = Path.home() / ".openeo-gfmap" / "s2grid_bounds.geojson" + gdf_path = Path.home() / ".openeo-gfmap" / "s2grid_bounds_v2.geojson" if not gdf_path.exists(): _log.info("S2 grid not found, downloading it from artifactory.") # Downloads the file from the artifactory URL gdf_path.parent.mkdir(exist_ok=True) response = requests.get( - "https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds.geojson", + "https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds_v2.geojson", timeout=180, # 3mins ) with open(gdf_path, "wb") as f: @@ -61,13 +63,15 @@ def split_job_s2grid( raise ValueError("The GeoDataFrame must contain a CRS") polygons = polygons.to_crs(epsg=4326) - if polygons.geometry.geom_type[0] != "Point": - polygons["geometry"] = polygons.geometry.centroid + polygons["geometry"] = polygons.geometry.centroid # Dataset containing all the S2 tiles, find the nearest S2 tile for each point s2_grid = load_s2_grid() s2_grid["geometry"] = s2_grid.geometry.centroid + # Filter tiles on CDSE availability + s2_grid = s2_grid[s2_grid.cdse_valid] + polygons = gpd.sjoin_nearest(polygons, s2_grid[["tile", "geometry"]]).drop( columns=["index_right"] ) diff --git a/src/openeo_gfmap/utils/catalogue.py b/src/openeo_gfmap/utils/catalogue.py index d6fd60e..2e42990 100644 --- a/src/openeo_gfmap/utils/catalogue.py +++ b/src/openeo_gfmap/utils/catalogue.py @@ -1,9 +1,12 @@ """Functionalities to interract with product catalogues.""" +from typing import Optional + import geojson import requests from pyproj.crs import CRS from rasterio.warp import transform_bounds +from requests import adapters from shapely import unary_union from shapely.geometry import box, shape @@ -15,6 +18,20 @@ TemporalContext, ) +request_sessions: Optional[requests.Session] = None + + +def _request_session() -> requests.Session: + global request_sessions + + if request_sessions is None: + request_sessions = requests.Session() + retries = adapters.Retry( + total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504] + ) + request_sessions.mount("https://", adapters.HTTPAdapter(max_retries=retries)) + return request_sessions + class UncoveredS1Exception(Exception): """Exception raised when there is no product available to fully cover spatially a given @@ -48,13 +65,14 @@ def _query_cdse_catalogue( url = ( f"https://catalogue.dataspace.copernicus.eu/resto/api/collections/" f"{collection}/search.json?box={minx},{miny},{maxx},{maxy}" - f"&sortParam=startDate&maxRecords=100" + f"&sortParam=startDate&maxRecords=1000&polarisation=VV%26VH" f"&dataset=ESA-DATASET&startDate={start_date}&completionDate={end_date}" ) for key, value in additional_parameters.items(): url += f"&{key}={value}" - response = requests.get(url) + session = _request_session() + response = session.get(url, timeout=60) if response.status_code != 200: raise Exception( diff --git a/tests/test_openeo_gfmap/test_s1_fetchers.py b/tests/test_openeo_gfmap/test_s1_fetchers.py index d805fb8..01e41e2 100644 --- a/tests/test_openeo_gfmap/test_s1_fetchers.py +++ b/tests/test_openeo_gfmap/test_s1_fetchers.py @@ -50,7 +50,7 @@ def sentinel1_grd( "elevation_model": "COPERNICUS_30", "coefficient": "gamma0-ellipsoid", "load_collection": { - "polarization": lambda polar: (polar == "VV") or (polar == "VH"), + "polarization": lambda polar: polar == "VV&VH", }, } @@ -156,7 +156,7 @@ def sentinel1_grd_point_based( "elevation_model": "COPERNICUS_30", "coefficient": "gamma0-ellipsoid", "load_collection": { - "polarization": lambda polar: (polar == "VV") or (polar == "VH"), + "polarization": lambda polar: polar == "VV&VH", }, } extractor = build_sentinel1_grd_extractor( From d1733042c9d38f632b781a80a72c5b78e23926a1 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 22 Aug 2024 10:24:31 +0200 Subject: [PATCH 02/11] Implemented PR changes @HansVRP --- src/openeo_gfmap/fetching/generic.py | 10 ++- src/openeo_gfmap/manager/job_manager.py | 106 +++++------------------- 2 files changed, 31 insertions(+), 85 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index 197004a..54756bc 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -106,6 +106,12 @@ def generic_default_processor(cube: openeo.DataCube, **params): return generic_default_processor +def _unavailable_collection_error(collection_name: str, _: FetchType): + raise ValueError( + f"Collection {collection_name} is not available for the selected backend." + ) + + OTHER_BACKEND_MAP = { "AGERA5": { Backend.TERRASCOPE: { @@ -113,11 +119,11 @@ def generic_default_processor(cube: openeo.DataCube, **params): "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), }, Backend.CDSE: { - "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), + "fetch": _unavailable_collection_error, "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), }, Backend.CDSE_STAGING: { - "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), + "fetch": _unavailable_collection_error, "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), }, Backend.FED: { diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 3e73b90..119cb7b 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -3,16 +3,13 @@ import threading import time from concurrent.futures import ThreadPoolExecutor -from datetime import datetime -from enum import Enum from functools import partial from pathlib import Path from threading import Lock -from typing import Callable, NamedTuple, Optional, Union +from typing import Callable, Optional, Union import pandas as pd import pystac -from openeo import Connection from openeo.extra.job_management import MultiBackendJobManager from openeo.rest.job import BatchJob from pystac import CatalogType @@ -24,7 +21,19 @@ _stac_lock = Lock() -def retry_on_exception(max_retries, delay=30): +def retry_on_exception(max_retries: int, delay_s: float = 180.0): + """Decorator to retry a function if an exception occurs. + Used for post-job actions that can crash due to internal backend issues. Restarting the action + usually helps to solve the issue. + + Parameters + ---------- + max_retries: int + The maximum number of retries to attempt before finally raising the exception. + delay: int (default=180 seconds) + The delay in seconds to wait before retrying the decorated function. + """ + def decorator(func): def wrapper(*args, **kwargs): latest_exception = None @@ -32,7 +41,9 @@ def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: - time.sleep(delay) + time.sleep( + delay_s + ) # Waits before retrying, while allowing other futures to run. latest_exception = e raise latest_exception @@ -42,12 +53,7 @@ def wrapper(*args, **kwargs): def done_callback(future, df, idx): - """Sets the status of the job to the given status when the future is done. - - If an exception occurred, then tries to retry the post-job action once if - it wasn't done already. If this is the second time the post-job action - fails, then the job is set to error. - """ + """Changes the status of the job when the post-job action future is done.""" current_status = df.loc[idx, "status"] exception = future.exception() if exception is None: @@ -70,13 +76,6 @@ def done_callback(future, df, idx): df.loc[idx, "status"] = "error" -class PostJobStatus(Enum): - """Indicates the workers if the job finished as sucessful or with an error.""" - - FINISHED = "finished" - ERROR = "error" - - class GFMAPJobManager(MultiBackendJobManager): """A job manager for the GFMAP backend.""" @@ -128,53 +127,6 @@ def __init__( self._max_jobs_worktime = max_jobs_worktime self._max_jobs = max_jobs - def add_backend( - self, - name: str, - connection, - parallel_jobs: Optional[int] = 2, - dynamic_max_jobs: bool = False, - min_jobs: Optional[int] = None, - max_jobs: Optional[int] = None, - ): - if not dynamic_max_jobs: - if parallel_jobs is None: - raise ValueError( - "When dynamic_max_jobs is set to False, parallel_jobs must be provided." - ) - return super().add_backend(name, connection, parallel_jobs) - - if min_jobs is None or max_jobs is None: - raise ValueError( - "When dynamic_max_jobs is set to True, min_jobs and max_jobs must be provided." - ) - - if isinstance(connection, Connection): - c = connection - connection = lambda: c # noqa: E731 - assert callable(connection) - - # Create a new NamedTuple to store the dynamic backend properties - class _DynamicBackend(NamedTuple): - get_connection: Callable[[], Connection] - - @property - def parallel_jobs(self) -> int: - current_time = datetime.now() - - # Limiting working hours - start_worktime_hour = 8 - end_worktime_hour = 20 - - if ( - current_time.hour >= start_worktime_hour - and current_time.hour < end_worktime_hour - ): - return min_jobs - return max_jobs - - self.backends[name] = _DynamicBackend(get_connection=connection) - def _normalize_stac(self): default_collection_path = self._output_dir / "stac/collection.json" if self._catalogue_cache.exists(): @@ -373,7 +325,7 @@ def _update_statuses(self, df: pd.DataFrame): # Clear the futures that are done and raise their potential exceptions if they occurred. self._clear_queued_actions() - @retry_on_exception(max_retries=2, delay=30) + @retry_on_exception(max_retries=2, delay_s=180) def on_job_error(self, job: BatchJob, row: pd.Series): """Method called when a job finishes with an error. @@ -410,8 +362,10 @@ def on_job_error(self, job: BatchJob, row: pd.Series): f"Couldn't find any error logs. Please check the error manually on job ID: {job.job_id}." ) - @retry_on_exception(max_retries=2, delay=30) - def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): + @retry_on_exception(max_retries=2, delay_s=30) + def on_job_done( + self, job: BatchJob, row: pd.Series, lock: Lock + ): # pylint: disable=arguments-differ """Method called when a job finishes successfully. It will first download the results of the job and then call the `post_job_action` method. """ @@ -492,21 +446,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series, lock: Lock): ] job_items = [item for item in job_items if item.id not in existing_ids] - # validated_items = [] - # # Validate the items - # for item in job_items: - # try: - # item.validate() - # validated_items.append(item) - # except Exception as e: - # _log.warning( - # "Couldn't validate item %s from job %s, ignoring:\n%s", - # item.id, - # job.job_id, - # e, - # ) self._root_collection.add_items(job_items) - # self._root_collection.add_items(validated_items) _log.info("Added %s items to the STAC collection.", len(job_items)) self._persist_stac() From 9f06d62178aec5eb86238d806e1d7d8d7cd644a2 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Mon, 26 Aug 2024 09:41:34 +0200 Subject: [PATCH 03/11] Updated arguments & doc on catalogue check utilites, to make it more cler --- src/openeo_gfmap/utils/catalogue.py | 27 +++++++++++++++++---------- tests/test_openeo_gfmap/test_utils.py | 9 ++++++--- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/openeo_gfmap/utils/catalogue.py b/src/openeo_gfmap/utils/catalogue.py index 2e42990..cd5848e 100644 --- a/src/openeo_gfmap/utils/catalogue.py +++ b/src/openeo_gfmap/utils/catalogue.py @@ -65,8 +65,8 @@ def _query_cdse_catalogue( url = ( f"https://catalogue.dataspace.copernicus.eu/resto/api/collections/" f"{collection}/search.json?box={minx},{miny},{maxx},{maxy}" - f"&sortParam=startDate&maxRecords=1000&polarisation=VV%26VH" - f"&dataset=ESA-DATASET&startDate={start_date}&completionDate={end_date}" + f"&sortParam=startDate&maxRecords=1000&dataset=ESA-DATASET" + f"&startDate={start_date}&completionDate={end_date}" ) for key, value in additional_parameters.items(): url += f"&{key}={value}" @@ -125,19 +125,20 @@ def _check_cdse_catalogue( return len(grd_tiles) > 0 -def s1_area_per_orbitstate( +def s1_area_per_orbitstate_vvvh( backend: BackendContext, spatial_extent: SpatialContext, temporal_extent: TemporalContext, ) -> dict: """Evaluates for both the ascending and descending state orbits the area of interesection between the given spatio-temporal context and the products available in the backend's - catalogue. + catalogue. Only works for the products with the VV&VH polarisation. Parameters ---------- backend : BackendContext - The backend to be within, as each backend might use different catalogues. + The backend to be within, as each backend might use different catalogues. Only the CDSE, + CDSE_STAGING and FED backends are supported. spatial_extent : SpatialContext The spatial extent to be checked, it will check within its bounding box. temporal_extent : TemporalContext @@ -177,7 +178,11 @@ def s1_area_per_orbitstate( if backend.backend in [Backend.CDSE, Backend.CDSE_STAGING, Backend.FED]: ascending_products = _parse_cdse_products( _query_cdse_catalogue( - "Sentinel1", bounds, temporal_extent, orbitDirection="ASCENDING" + "Sentinel1", + bounds, + temporal_extent, + orbitDirection="ASCENDING", + polarisation="VV&VH", ) ) descending_products = _parse_cdse_products( @@ -186,6 +191,7 @@ def s1_area_per_orbitstate( bounds, temporal_extent, orbitDirection="DESCENDING", + polarisation="VV&VH", ) ) else: @@ -222,18 +228,19 @@ def s1_area_per_orbitstate( } -def select_S1_orbitstate( +def select_S1_orbitstate_vvvh( backend: BackendContext, spatial_extent: SpatialContext, temporal_extent: TemporalContext, ) -> str: """Selects the orbit state that covers the most area of the given spatio-temporal context - for the Sentinel-1 collection. + for the Sentinel-1 collection. Only works for the product with the VV&VH polarisation. Parameters ---------- backend : BackendContext - The backend to be within, as each backend might use different catalogues. + The backend to be within, as each backend might use different catalogues. Only the CDSE, + CDSE_STAGING and FED backends are supported. spatial_extent : SpatialContext The spatial extent to be checked, it will check within its bounding box. temporal_extent : TemporalContext @@ -246,7 +253,7 @@ def select_S1_orbitstate( """ # Queries the products in the catalogues - areas = s1_area_per_orbitstate(backend, spatial_extent, temporal_extent) + areas = s1_area_per_orbitstate_vvvh(backend, spatial_extent, temporal_extent) ascending_overlap = areas["ASCENDING"]["full_overlap"] descending_overlap = areas["DESCENDING"]["full_overlap"] diff --git a/tests/test_openeo_gfmap/test_utils.py b/tests/test_openeo_gfmap/test_utils.py index ef2cc89..3c95bdf 100644 --- a/tests/test_openeo_gfmap/test_utils.py +++ b/tests/test_openeo_gfmap/test_utils.py @@ -6,7 +6,10 @@ from openeo_gfmap import Backend, BackendContext, BoundingBoxExtent, TemporalContext from openeo_gfmap.utils import update_nc_attributes -from openeo_gfmap.utils.catalogue import s1_area_per_orbitstate, select_S1_orbitstate +from openeo_gfmap.utils.catalogue import ( + s1_area_per_orbitstate_vvvh, + select_S1_orbitstate_vvvh, +) # Region of Paris, France SPATIAL_CONTEXT = BoundingBoxExtent( @@ -20,7 +23,7 @@ def test_query_cdse_catalogue(): backend_context = BackendContext(Backend.CDSE) - response = s1_area_per_orbitstate( + response = s1_area_per_orbitstate_vvvh( backend=backend_context, spatial_extent=SPATIAL_CONTEXT, temporal_extent=TEMPORAL_CONTEXT, @@ -41,7 +44,7 @@ def test_query_cdse_catalogue(): assert response["DESCENDING"]["full_overlap"] is True # Testing the decision maker, it should return DESCENDING - decision = select_S1_orbitstate( + decision = select_S1_orbitstate_vvvh( backend=backend_context, spatial_extent=SPATIAL_CONTEXT, temporal_extent=TEMPORAL_CONTEXT, From 534a9beea49ef2297c85ea983bf8cbad222c0210 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Mon, 26 Aug 2024 09:51:43 +0200 Subject: [PATCH 04/11] Fixed import error --- tests/test_openeo_gfmap/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_openeo_gfmap/test_utils.py b/tests/test_openeo_gfmap/test_utils.py index 85cebcb..95c241a 100644 --- a/tests/test_openeo_gfmap/test_utils.py +++ b/tests/test_openeo_gfmap/test_utils.py @@ -6,7 +6,7 @@ from netCDF4 import Dataset from openeo_gfmap import Backend, BackendContext, BoundingBoxExtent, TemporalContext -from openeo_gfmap.utils import update_nc_attributes +from openeo_gfmap.utils import split_collection_by_epsg, update_nc_attributes from openeo_gfmap.utils.catalogue import ( s1_area_per_orbitstate_vvvh, select_s1_orbitstate_vvvh, From b2bd4b2cd7c1a3daf8a9b378add04da630e3e001 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 29 Aug 2024 10:38:56 +0200 Subject: [PATCH 05/11] Reworked generic fetcher/processor toaccept all type of collections --- src/openeo_gfmap/fetching/generic.py | 128 ++++++++++----------------- 1 file changed, 45 insertions(+), 83 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index 54756bc..767de00 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -2,9 +2,10 @@ """ from functools import partial -from typing import Callable +from typing import Callable, Optional import openeo +from openeo.rest import OpenEoApiError from openeo_gfmap.backend import Backend, BackendContext from openeo_gfmap.fetching import CollectionFetcher, FetchType, _log @@ -28,15 +29,18 @@ "vapour-pressure": "AGERA5-VAPOUR", "wind-speed": "AGERA5-WIND", } +KNOWN_UNTEMPORAL_COLLECTIONS = ["COPERNICUS_30"] -def _get_generic_fetcher(collection_name: str, fetch_type: FetchType) -> Callable: +def _get_generic_fetcher( + collection_name: str, fetch_type: FetchType, backend: Backend +) -> Callable: + band_mapping: Optional[dict] = None + if collection_name == "COPERNICUS_30": - BASE_MAPPING = BASE_DEM_MAPPING + band_mapping = BASE_DEM_MAPPING elif collection_name == "AGERA5": - BASE_MAPPING = BASE_WEATHER_MAPPING - else: - raise Exception("Please choose a valid collection.") + band_mapping = BASE_WEATHER_MAPPING def generic_default_fetcher( connection: openeo.Connection, @@ -45,23 +49,34 @@ def generic_default_fetcher( bands: list, **params, ) -> openeo.DataCube: - bands = convert_band_names(bands, BASE_MAPPING) + if band_mapping is not None: + bands = convert_band_names(bands, band_mapping) - if (collection_name == "COPERNICUS_30") and (temporal_extent is not None): + if (collection_name in KNOWN_UNTEMPORAL_COLLECTIONS) and ( + temporal_extent is not None + ): _log.warning( - "User set-up non None temporal extent for DEM collection. Ignoring it." + "User set-up non None temporal extent for %s collection. Ignoring it.", + collection_name, ) temporal_extent = None - cube = _load_collection( - connection, - bands, - collection_name, - spatial_extent, - temporal_extent, - fetch_type, - **params, - ) + try: + cube = _load_collection( + connection, + bands, + collection_name, + spatial_extent, + temporal_extent, + fetch_type, + **params, + ) + except OpenEoApiError as e: + if "CollectionNotFound" in str(e): + raise ValueError( + f"Collection {collection_name} not found in the selected backend {backend.value}." + ) from e + raise e # # Apply if the collection is a GeoJSON Feature collection # if isinstance(spatial_extent, GeoJSON): @@ -76,12 +91,11 @@ def _get_generic_processor(collection_name: str, fetch_type: FetchType) -> Calla """Builds the preprocessing function from the collection name as it stored in the target backend. """ + band_mapping: Optional[dict] = None if collection_name == "COPERNICUS_30": - BASE_MAPPING = BASE_DEM_MAPPING + band_mapping = BASE_DEM_MAPPING elif collection_name == "AGERA5": - BASE_MAPPING = BASE_WEATHER_MAPPING - else: - raise Exception("Please choose a valid collection.") + band_mapping = BASE_WEATHER_MAPPING def generic_default_processor(cube: openeo.DataCube, **params): """Default collection preprocessing method for generic datasets. @@ -99,67 +113,14 @@ def generic_default_processor(cube: openeo.DataCube, **params): if collection_name == "COPERNICUS_30": cube = cube.min_time() - cube = rename_bands(cube, BASE_MAPPING) + if band_mapping is not None: + cube = rename_bands(cube, band_mapping) return cube return generic_default_processor -def _unavailable_collection_error(collection_name: str, _: FetchType): - raise ValueError( - f"Collection {collection_name} is not available for the selected backend." - ) - - -OTHER_BACKEND_MAP = { - "AGERA5": { - Backend.TERRASCOPE: { - "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), - "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), - }, - Backend.CDSE: { - "fetch": _unavailable_collection_error, - "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), - }, - Backend.CDSE_STAGING: { - "fetch": _unavailable_collection_error, - "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), - }, - Backend.FED: { - "fetch": partial(_get_generic_fetcher, collection_name="AGERA5"), - "preprocessor": partial(_get_generic_processor, collection_name="AGERA5"), - }, - }, - "COPERNICUS_30": { - Backend.TERRASCOPE: { - "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), - "preprocessor": partial( - _get_generic_processor, collection_name="COPERNICUS_30" - ), - }, - Backend.CDSE: { - "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), - "preprocessor": partial( - _get_generic_processor, collection_name="COPERNICUS_30" - ), - }, - Backend.CDSE_STAGING: { - "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), - "preprocessor": partial( - _get_generic_processor, collection_name="COPERNICUS_30" - ), - }, - Backend.FED: { - "fetch": partial(_get_generic_fetcher, collection_name="COPERNICUS_30"), - "preprocessor": partial( - _get_generic_processor, collection_name="COPERNICUS_30" - ), - }, - }, -} - - def build_generic_extractor( backend_context: BackendContext, bands: list, @@ -168,13 +129,14 @@ def build_generic_extractor( **params, ) -> CollectionFetcher: """Creates a generic extractor adapted to the given backend. Currently only tested with VITO backend""" - backend_functions = OTHER_BACKEND_MAP.get(collection_name).get( - backend_context.backend + fetcher = partial( + _get_generic_fetcher, + collection_name=collection_name, + fetch_type=fetch_type, + backend=backend_context.backend, ) - - fetcher, preprocessor = ( - backend_functions["fetch"](fetch_type=fetch_type), - backend_functions["preprocessor"](fetch_type=fetch_type), + preprocessor = partial( + _get_generic_processor, collection_name=collection_name, fetch_type=fetch_type ) return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params) From deedea718356aedf60569b022c1e2b1fd7ce129c Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 29 Aug 2024 11:44:23 +0200 Subject: [PATCH 06/11] Implemented request changes --- src/openeo_gfmap/manager/job_manager.py | 106 +++++++++++++++------- src/openeo_gfmap/manager/job_splitters.py | 5 + src/openeo_gfmap/utils/catalogue.py | 10 +- 3 files changed, 84 insertions(+), 37 deletions(-) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 119cb7b..189121c 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -91,10 +91,39 @@ def __init__( n_threads: int = 1, resume_postproc: bool = True, # If we need to check for post-job actions that crashed restart_failed: bool = False, # If we need to restart failed jobs - dynamic_max_jobs: bool = True, # If we need to dynamically change the maximum number of parallel jobs - max_jobs_worktime: bool = 10, # Maximum number of jobs to run in a given time - max_jobs: int = 20, # Maximum number of jobs to run at the same time ): + """ + Initializes the GFMAP job manager. + + Parameters + ---------- + output_dir: Path + The base output directory where the results/stac/logs of the jobs will be stored. + output_path_generator: Callable + User defined function that generates the output path for the job results. Expects as + inputs the output directory, the index of the job in the job dataframe + and the row of the job, and returns the final path where to save a job result asset. + collection_id: Optional[str] + The ID of the STAC collection that is being generated. Can be left empty if the STAC + catalogue is not being generated or if it is being resumed from an existing catalogue. + collection_description: Optional[str] + The description of the STAC collection that is being generated. + stac: Optional[Union[str, Path]] + The path to the STAC collection to be saved or resumed. + If None, the default path will be used. + post_job_action: Optional[Callable] + A user defined function that will be called after a job is finished. It will receive + the list of items generated by the job and the row of the job, and should return the + updated list of items. + poll_sleep: int + The time in seconds to wait between polling the backend for job status. + n_threads: int + The number of threads to execute `on_job_done` and `on_job_error` functions. + resume_postproc: bool + If set to true, all `on_job_done` and `on_job_error` functions that failed are resumed. + restart_failed: bool + If set to true, all jobs that failed within the OpenEO backend are restarted. + """ self._output_dir = output_dir self._catalogue_cache = output_dir / "catalogue_cache.bin" @@ -119,15 +148,13 @@ def __init__( MultiBackendJobManager._normalize_df = self._normalize_df super().__init__(poll_sleep) - self._root_collection = self._normalize_stac() - - # Add a property that calculates the number of maximum concurrent jobs - # dinamically depending on the time - self._dynamic_max_jobs = dynamic_max_jobs - self._max_jobs_worktime = max_jobs_worktime - self._max_jobs = max_jobs + self._root_collection = self._initialize_stac() - def _normalize_stac(self): + def _load_stac(self) -> Optional[pystac.Collection]: + """ + Loads the STAC collection from the cache, the specified `stac` path or the default path. + If no STAC collection is found, returns None. + """ default_collection_path = self._output_dir / "stac/collection.json" if self._catalogue_cache.exists(): _log.info( @@ -135,32 +162,52 @@ def _normalize_stac(self): self._catalogue_cache, ) with open(self._catalogue_cache, "rb") as file: - root_collection = pickle.load(file) + return pickle.load(file) elif self.stac is not None: _log.info( "Reloading the STAC collection from the provided path: %s.", self.stac ) - root_collection = pystac.read_file(str(self.stac)) + return pystac.read_file(str(self.stac)) elif default_collection_path.exists(): _log.info( "Reload the STAC collection from the default path: %s.", default_collection_path, ) self.stac = default_collection_path - root_collection = pystac.read_file(str(self.stac)) - else: - _log.info("Starting a fresh STAC collection.") - assert ( - self.collection_id is not None - ), "A collection ID is required to generate a STAC collection." - root_collection = pystac.Collection( - id=self.collection_id, - description=self.collection_description, - extent=None, + return pystac.read_file(str(self.stac)) + + _log.info( + "No STAC collection found as cache, in the default path or in the provided path." + ) + return None + + def _create_stac(self) -> pystac.Collection: + """ + Creates and returns new STAC collection. The created stac collection will use the + `collection_id` and `collection_description` parameters set in the constructor. + """ + if self.collection_id is None: + raise ValueError( + "A collection ID is required to generate a STAC collection." ) - root_collection.license = constants.LICENSE - root_collection.add_link(constants.LICENSE_LINK) - root_collection.stac_extensions = constants.STAC_EXTENSIONS + collection = pystac.Collection( + id=self.collection_id, + description=self.collection_description, + extent=None, + ) + collection.license = constants.LICENSE + collection.add_link(constants.LICENSE_LINK) + collection.stac_extensions = constants.STAC_EXTENSIONS + return collection + + def _initialize_stac(self) -> pystac.Collection: + """ + Loads and returns if possible an existing stac collection, otherwise creates a new one. + """ + root_collection = self._load_stac() + if not root_collection: + _log.info("Starting a fresh STAC collection.") + root_collection = self._create_stac() return root_collection @@ -461,12 +508,7 @@ def on_job_done( _log.info("Job %s and post job action finished successfully.", job.job_id) def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: - """Ensure we have the required columns and the expected type for the geometry column. - - :param df: The dataframe to normalize. - :return: a new dataframe that is normalized. - """ - + """Ensure we have the required columns and the expected type for the geometry column.""" # check for some required columns. required_with_default = [ ("status", "not_started"), diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py index 9a795fb..a7d722d 100644 --- a/src/openeo_gfmap/manager/job_splitters.py +++ b/src/openeo_gfmap/manager/job_splitters.py @@ -26,6 +26,11 @@ def load_s2_grid() -> gpd.GeoDataFrame: "https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds_v2.geojson", timeout=180, # 3mins ) + if response.status_code != 200: + raise ValueError( + "Failed to download the S2 grid from the artifactory. " + f"Status code: {response.status_code}" + ) with open(gdf_path, "wb") as f: f.write(response.content) return gpd.read_file(gdf_path) diff --git a/src/openeo_gfmap/utils/catalogue.py b/src/openeo_gfmap/utils/catalogue.py index f846cf2..b22e64a 100644 --- a/src/openeo_gfmap/utils/catalogue.py +++ b/src/openeo_gfmap/utils/catalogue.py @@ -138,9 +138,9 @@ def s1_area_per_orbitstate_vvvh( spatial_extent: SpatialContext, temporal_extent: TemporalContext, ) -> dict: - """Evaluates for both the ascending and descending state orbits the area of interesection - between the given spatio-temporal context and the products available in the backend's - catalogue. Only works for the products with the VV&VH polarisation. + """ + Evaluates for both the ascending and descending state orbits the area of interesection for the + available products with a VV&VH polarisation. Parameters ---------- @@ -241,8 +241,8 @@ def select_s1_orbitstate_vvvh( spatial_extent: SpatialContext, temporal_extent: TemporalContext, ) -> str: - """Selects the orbit state that covers the most area of the given spatio-temporal context - for the Sentinel-1 collection. Only works for the product with the VV&VH polarisation. + """Selects the orbit state that covers the most area of intersection for the + available products with a VV&VH polarisation. Parameters ---------- From e5fc575aec5d7ee31fd1662eb0d770610eb3679d Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 29 Aug 2024 11:46:48 +0200 Subject: [PATCH 07/11] Added validation in the STAC collection before persisting --- src/openeo_gfmap/manager/job_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 189121c..24ebada 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -595,6 +595,8 @@ def _write_stac(self): def _persist_stac(self): """Persists the STAC collection by saving it into a binary file.""" + _log.debug("Validating the STAC collection before persisting.") + self._root_collection.validate_all() _log.info("Persisting STAC collection to temp file %s.", self._catalogue_cache) with open(self._catalogue_cache, "wb") as file: pickle.dump(self._root_collection, file) From f114e333dea9f82b7dcdc6d341f68ab702714b7d Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 29 Aug 2024 16:15:37 +0200 Subject: [PATCH 08/11] Fix ampersand in URL encoding & generic fetcher collection name --- src/openeo_gfmap/fetching/generic.py | 1 - src/openeo_gfmap/utils/catalogue.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index 767de00..9c4b439 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -131,7 +131,6 @@ def build_generic_extractor( """Creates a generic extractor adapted to the given backend. Currently only tested with VITO backend""" fetcher = partial( _get_generic_fetcher, - collection_name=collection_name, fetch_type=fetch_type, backend=backend_context.backend, ) diff --git a/src/openeo_gfmap/utils/catalogue.py b/src/openeo_gfmap/utils/catalogue.py index b22e64a..29e8973 100644 --- a/src/openeo_gfmap/utils/catalogue.py +++ b/src/openeo_gfmap/utils/catalogue.py @@ -190,7 +190,7 @@ def s1_area_per_orbitstate_vvvh( bounds, temporal_extent, orbitDirection="ASCENDING", - polarisation="VV&VH", + polarisation="VV%26VH", ) ) descending_products = _parse_cdse_products( @@ -199,7 +199,7 @@ def s1_area_per_orbitstate_vvvh( bounds, temporal_extent, orbitDirection="DESCENDING", - polarisation="VV&VH", + polarisation="VV%26VH", ) ) else: From 9b399d4df5f8b98e042f6f0a9878292e9c333289 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 29 Aug 2024 16:20:30 +0200 Subject: [PATCH 09/11] Fixed generic extractor? --- src/openeo_gfmap/fetching/generic.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index 9c4b439..ee0b430 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -1,7 +1,6 @@ """ Generic extraction of features, supporting VITO backend. """ -from functools import partial from typing import Callable, Optional import openeo @@ -129,13 +128,7 @@ def build_generic_extractor( **params, ) -> CollectionFetcher: """Creates a generic extractor adapted to the given backend. Currently only tested with VITO backend""" - fetcher = partial( - _get_generic_fetcher, - fetch_type=fetch_type, - backend=backend_context.backend, - ) - preprocessor = partial( - _get_generic_processor, collection_name=collection_name, fetch_type=fetch_type - ) + fetcher = _get_generic_fetcher(collection_name, fetch_type, backend_context.backend) + preprocessor = _get_generic_processor(collection_name, fetch_type) return CollectionFetcher(backend_context, bands, fetcher, preprocessor, **params) From b9e65ba0fe460f83d2ed5694cea068725a5f1118 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Fri, 30 Aug 2024 10:10:03 +0200 Subject: [PATCH 10/11] Added possibility to disable all stac-releated features in the job manager --- src/openeo_gfmap/manager/job_manager.py | 65 +++++++++++-------- .../test_generic_fetchers.py | 0 2 files changed, 39 insertions(+), 26 deletions(-) create mode 100644 tests/test_openeo_gfmap/test_generic_fetchers.py diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 24ebada..b9dd859 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -91,6 +91,7 @@ def __init__( n_threads: int = 1, resume_postproc: bool = True, # If we need to check for post-job actions that crashed restart_failed: bool = False, # If we need to restart failed jobs + stac_enabled: bool = True, ): """ Initializes the GFMAP job manager. @@ -123,11 +124,15 @@ def __init__( If set to true, all `on_job_done` and `on_job_error` functions that failed are resumed. restart_failed: bool If set to true, all jobs that failed within the OpenEO backend are restarted. + stac_enabled: bool (default=True) + If the STAC generation is enabled or not. Disabling it will prevent the creation, + update and loading of the STAC collection. """ self._output_dir = output_dir self._catalogue_cache = output_dir / "catalogue_cache.bin" self.stac = stac + self.stac_enabled = stac_enabled self.collection_id = collection_id self.collection_description = collection_description @@ -148,7 +153,8 @@ def __init__( MultiBackendJobManager._normalize_df = self._normalize_df super().__init__(poll_sleep) - self._root_collection = self._initialize_stac() + if self.stac_enabled: + self._root_collection = self._initialize_stac() def _load_stac(self) -> Optional[pystac.Collection]: """ @@ -484,26 +490,9 @@ def on_job_done( _log.info("Adding %s items to the STAC collection...", len(job_items)) - with lock: # Take the STAC lock to avoid concurrence issues - try: - _log.info("Thread %s entered the STAC lock.", threading.get_ident()) - # Filters the job items to only keep the ones that are not already in the collection - existing_ids = [ - item.id for item in self._root_collection.get_all_items() - ] - job_items = [item for item in job_items if item.id not in existing_ids] - - self._root_collection.add_items(job_items) - _log.info("Added %s items to the STAC collection.", len(job_items)) - - self._persist_stac() - except Exception as e: - _log.exception( - "Error adding items to the STAC collection for job %s:\n%s ", - job.job_id, - str(e), - ) - raise e + if self.stac_enabled: + with lock: + self._update_stac(job.job_id, job_items) _log.info("Job %s and post job action finished successfully.", job.job_id) @@ -572,11 +561,13 @@ def run_jobs( self._wait_queued_actions() _log.info("Exiting ThreadPoolExecutor.") self._executor = None - _log.info( - "Finished running jobs, saving persisted STAC collection to final .json collection." - ) - self._write_stac() - _log.info("Saved STAC catalogue to JSON format, all tasks finished!") + _log.info("All jobs finished running.") + if self.stac_enabled: + _log.info("Saving persisted STAC collection to final .json collection.") + self._write_stac() + _log.info("Saved STAC catalogue to JSON format, all tasks finished!") + else: + _log.info("STAC was disabled, skipping generation of the catalogue.") def _write_stac(self): """Writes the STAC collection to the output directory.""" @@ -601,6 +592,28 @@ def _persist_stac(self): with open(self._catalogue_cache, "wb") as file: pickle.dump(self._root_collection, file) + def _update_stac(self, job_id: str, job_items: list[pystac.Item]): + """Updates the STAC collection by adding the items generated by the job. + Does not add duplicates or override with the same item ID. + """ + try: + _log.info("Thread %s entered the STAC lock.", threading.get_ident()) + # Filters the job items to only keep the ones that are not already in the collection + existing_ids = [item.id for item in self._root_collection.get_all_items()] + job_items = [item for item in job_items if item.id not in existing_ids] + + self._root_collection.add_items(job_items) + _log.info("Added %s items to the STAC collection.", len(job_items)) + + self._persist_stac() + except Exception as e: + _log.exception( + "Error adding items to the STAC collection for job %s:\n%s ", + job_id, + str(e), + ) + raise e + def setup_stac( self, constellation: Optional[str] = None, diff --git a/tests/test_openeo_gfmap/test_generic_fetchers.py b/tests/test_openeo_gfmap/test_generic_fetchers.py new file mode 100644 index 0000000..e69de29 From bec9976ed2565dd3fd5eafe25a9fca85364f65bb Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Mon, 2 Sep 2024 13:37:09 +0200 Subject: [PATCH 11/11] Implemented PR changes requests --- src/openeo_gfmap/fetching/generic.py | 2 +- src/openeo_gfmap/manager/job_manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/openeo_gfmap/fetching/generic.py b/src/openeo_gfmap/fetching/generic.py index ee0b430..46edb74 100644 --- a/src/openeo_gfmap/fetching/generic.py +++ b/src/openeo_gfmap/fetching/generic.py @@ -55,7 +55,7 @@ def generic_default_fetcher( temporal_extent is not None ): _log.warning( - "User set-up non None temporal extent for %s collection. Ignoring it.", + "Ignoring the temporal extent provided by the user as the collection %s is known to be untemporal.", collection_name, ) temporal_extent = None diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index b9dd859..986bbf4 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -21,7 +21,7 @@ _stac_lock = Lock() -def retry_on_exception(max_retries: int, delay_s: float = 180.0): +def retry_on_exception(max_retries: int, delay_s: int = 180): """Decorator to retry a function if an exception occurs. Used for post-job actions that can crash due to internal backend issues. Restarting the action usually helps to solve the issue.