diff --git a/cropclassification/calc_timeseries.py b/cropclassification/calc_timeseries.py index 123afbcb..24ab8315 100644 --- a/cropclassification/calc_timeseries.py +++ b/cropclassification/calc_timeseries.py @@ -172,6 +172,7 @@ def calc_timeseries_task(config_paths: List[Path], default_basedir: Path): f"As we are only testing, process only {len(input_image_paths)} test images" ) + nb_parallel = conf.general.getint("nb_parallel", -1) try: images_bands = [(path, ["VV", "VH"]) for path in input_image_paths] zonal_stats_bulk.zonal_stats( @@ -182,6 +183,7 @@ def calc_timeseries_task(config_paths: List[Path], default_basedir: Path): temp_dir=temp_dir, log_dir=log_dir, log_level=log_level, + nb_parallel=nb_parallel, ) except Exception as ex: logger.exception(ex) @@ -255,6 +257,7 @@ def calc_timeseries_task(config_paths: List[Path], default_basedir: Path): temp_dir=temp_dir, log_dir=log_dir, log_level=log_level, + nb_parallel=nb_parallel, ) except Exception as ex: logger.exception(ex) diff --git a/cropclassification/general.ini b/cropclassification/general.ini index a814db1a..a3c54ab4 100644 --- a/cropclassification/general.ini +++ b/cropclassification/general.ini @@ -52,6 +52,10 @@ geofile_ext = .gpkg # The log level to use log_level = INFO +# The number of parallel threads/processes to start to do local processing. If -1, the +# number of available CPUs. +nb_parallel = -1 + [marker] # markertype, must be overriden in child ini files markertype = MUST_OVERRIDE diff --git a/cropclassification/preprocess/_timeseries_calc_openeo.py b/cropclassification/preprocess/_timeseries_calc_openeo.py index 7eb5e172..94ab6ef5 100644 --- a/cropclassification/preprocess/_timeseries_calc_openeo.py +++ b/cropclassification/preprocess/_timeseries_calc_openeo.py @@ -26,6 +26,7 @@ def calculate_periodic_timeseries( sensordata_to_get: List[ImageProfile], dest_image_data_dir: Path, dest_data_dir: Path, + nb_parallel: int, ): """ Calculate timeseries data for the input parcels. @@ -61,6 +62,7 @@ def calculate_periodic_timeseries( output_dir=dest_data_dir, stats=["count", "mean", "median", "std", "min", "max"], engine="pyqgis", + nb_parallel=nb_parallel, ) """ diff --git a/cropclassification/preprocess/timeseries.py b/cropclassification/preprocess/timeseries.py index 93909385..a921c397 100644 --- a/cropclassification/preprocess/timeseries.py +++ b/cropclassification/preprocess/timeseries.py @@ -94,6 +94,7 @@ def calc_timeseries_data( sensordata_to_get=sensordata_to_get_info_openeo, dest_image_data_dir=conf.dirs.getpath("images_periodic_dir"), dest_data_dir=dest_data_dir, + nb_parallel=conf.general.getint("nb_parallel", -1), ) diff --git a/cropclassification/util/zonal_stats_bulk/__init__.py b/cropclassification/util/zonal_stats_bulk/__init__.py index 8e97d454..353620f0 100644 --- a/cropclassification/util/zonal_stats_bulk/__init__.py +++ b/cropclassification/util/zonal_stats_bulk/__init__.py @@ -13,6 +13,7 @@ def zonal_stats( cloud_filter_band: Optional[str] = None, calc_bands_parallel: bool = True, engine: str = "rasterstats", + nb_parallel: int = -1, force: bool = False, ): """ @@ -23,6 +24,8 @@ def zonal_stats( id_column (str): _description_ rasters_bands (List[Tuple[Path, List[str]]]): _description_ output_dir (Path): _description_ + nb_parallel (int, optional): the number of parallel processes to use. + Defaults to -1: use all available processors. force (bool, optional): _description_. Defaults to False. Raises: @@ -41,6 +44,7 @@ def zonal_stats( output_dir=output_dir, stats=stats, columns=[id_column], + nb_parallel=nb_parallel, force=force, ) elif engine == "rasterstats": @@ -54,6 +58,7 @@ def zonal_stats( stats=stats, cloud_filter_band=cloud_filter_band, calc_bands_parallel=calc_bands_parallel, + nb_parallel=nb_parallel, force=force, ) else: diff --git a/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_pyqgis.py b/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_pyqgis.py index cb32ea4b..1bc3db48 100644 --- a/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_pyqgis.py +++ b/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_pyqgis.py @@ -49,6 +49,7 @@ def zonal_stats( rasters_bands: List[Tuple[Path, List[str]]], output_dir: Path, stats: List[Statistic] = DEFAULT_STATS, + nb_parallel: int = -1, force: bool = False, ): """ @@ -60,6 +61,8 @@ def zonal_stats( images_bands (List[Tuple[Path, List[str]]]): _description_ stats (List[Statistic]) output_dir (Path): _description_ + nb_parallel (int, optional): the number of parallel processes to use. + Defaults to -1: use all available processors. force (bool, optional): _description_. Defaults to False. Raises: @@ -79,8 +82,8 @@ def zonal_stats( start_time = datetime.now() nb_todo = 0 nb_done_total = 0 - nb_parallel_max = multiprocessing.cpu_count() - nb_parallel = nb_parallel_max + if nb_parallel < 1: + nb_parallel = multiprocessing.cpu_count() # Loop over all images and bands to calculate zonal stats in parallel... calc_queue = {} diff --git a/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_rs.py b/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_rs.py index 25b730f8..93a6169f 100644 --- a/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_rs.py +++ b/cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_rs.py @@ -44,6 +44,7 @@ def zonal_stats( stats: Literal["count", "mean", "median", "std", "min", "max"], cloud_filter_band: Optional[str] = None, calc_bands_parallel: bool = True, + nb_parallel: int = -1, force: bool = False, ): """ @@ -57,6 +58,8 @@ def zonal_stats( temp_dir (Path): _description_ log_dir (Path): _description_ log_level (Union[str, int]): _description_ + nb_parallel (int, optional): the number of parallel processes to use. + Defaults to -1: use all available processors. force (bool, optional): _description_. Defaults to False. Raises: @@ -82,8 +85,8 @@ def zonal_stats( log_level = logger.level # Create process pool for parallelisation... - nb_parallel_max = multiprocessing.cpu_count() - nb_parallel = nb_parallel_max + if nb_parallel < 1: + nb_parallel = multiprocessing.cpu_count() # Loop over all images to start the data preparation for each of them in # parallel... @@ -107,7 +110,7 @@ def zonal_stats( if ( image_idx < len(rasters_bands) and len(_filter_on_status(image_dict, "IMAGE_PREPARE_CALC_BUSY")) - < nb_parallel_max + < nb_parallel ): # Not too many busy preparing, so get next image_path to start # prepare on @@ -184,7 +187,7 @@ def zonal_stats( temp_dir=temp_dir, log_dir=log_dir, log_level=log_level, - nb_parallel_max=nb_parallel_max, + nb_parallel=nb_parallel, ) image_dict[image_path_str] = { "features_path": vector_path,