Skip to content

Commit

Permalink
Add nb_parallel config to be able to choose load (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
theroggy authored Aug 21, 2023
1 parent 58a9806 commit 3fb94ff
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cropclassification/calc_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def calc_timeseries_task(config_paths: List[Path], default_basedir: Path):
f"As we are only testing, process only {len(input_image_paths)} test images"
)

nb_parallel = conf.general.getint("nb_parallel", -1)
try:
images_bands = [(path, ["VV", "VH"]) for path in input_image_paths]
zonal_stats_bulk.zonal_stats(
Expand All @@ -182,6 +183,7 @@ def calc_timeseries_task(config_paths: List[Path], default_basedir: Path):
temp_dir=temp_dir,
log_dir=log_dir,
log_level=log_level,
nb_parallel=nb_parallel,
)
except Exception as ex:
logger.exception(ex)
Expand Down Expand Up @@ -255,6 +257,7 @@ def calc_timeseries_task(config_paths: List[Path], default_basedir: Path):
temp_dir=temp_dir,
log_dir=log_dir,
log_level=log_level,
nb_parallel=nb_parallel,
)
except Exception as ex:
logger.exception(ex)
Expand Down
4 changes: 4 additions & 0 deletions cropclassification/general.ini
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ geofile_ext = .gpkg
# The log level to use
log_level = INFO

# The number of parallel threads/processes to start to do local processing. If -1, the
# number of available CPUs.
nb_parallel = -1

[marker]
# markertype, must be overriden in child ini files
markertype = MUST_OVERRIDE
Expand Down
2 changes: 2 additions & 0 deletions cropclassification/preprocess/_timeseries_calc_openeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def calculate_periodic_timeseries(
sensordata_to_get: List[ImageProfile],
dest_image_data_dir: Path,
dest_data_dir: Path,
nb_parallel: int,
):
"""
Calculate timeseries data for the input parcels.
Expand Down Expand Up @@ -61,6 +62,7 @@ def calculate_periodic_timeseries(
output_dir=dest_data_dir,
stats=["count", "mean", "median", "std", "min", "max"],
engine="pyqgis",
nb_parallel=nb_parallel,
)

"""
Expand Down
1 change: 1 addition & 0 deletions cropclassification/preprocess/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def calc_timeseries_data(
sensordata_to_get=sensordata_to_get_info_openeo,
dest_image_data_dir=conf.dirs.getpath("images_periodic_dir"),
dest_data_dir=dest_data_dir,
nb_parallel=conf.general.getint("nb_parallel", -1),
)


Expand Down
5 changes: 5 additions & 0 deletions cropclassification/util/zonal_stats_bulk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def zonal_stats(
cloud_filter_band: Optional[str] = None,
calc_bands_parallel: bool = True,
engine: str = "rasterstats",
nb_parallel: int = -1,
force: bool = False,
):
"""
Expand All @@ -23,6 +24,8 @@ def zonal_stats(
id_column (str): _description_
rasters_bands (List[Tuple[Path, List[str]]]): _description_
output_dir (Path): _description_
nb_parallel (int, optional): the number of parallel processes to use.
Defaults to -1: use all available processors.
force (bool, optional): _description_. Defaults to False.
Raises:
Expand All @@ -41,6 +44,7 @@ def zonal_stats(
output_dir=output_dir,
stats=stats,
columns=[id_column],
nb_parallel=nb_parallel,
force=force,
)
elif engine == "rasterstats":
Expand All @@ -54,6 +58,7 @@ def zonal_stats(
stats=stats,
cloud_filter_band=cloud_filter_band,
calc_bands_parallel=calc_bands_parallel,
nb_parallel=nb_parallel,
force=force,
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def zonal_stats(
rasters_bands: List[Tuple[Path, List[str]]],
output_dir: Path,
stats: List[Statistic] = DEFAULT_STATS,
nb_parallel: int = -1,
force: bool = False,
):
"""
Expand All @@ -60,6 +61,8 @@ def zonal_stats(
images_bands (List[Tuple[Path, List[str]]]): _description_
stats (List[Statistic])
output_dir (Path): _description_
nb_parallel (int, optional): the number of parallel processes to use.
Defaults to -1: use all available processors.
force (bool, optional): _description_. Defaults to False.
Raises:
Expand All @@ -79,8 +82,8 @@ def zonal_stats(
start_time = datetime.now()
nb_todo = 0
nb_done_total = 0
nb_parallel_max = multiprocessing.cpu_count()
nb_parallel = nb_parallel_max
if nb_parallel < 1:
nb_parallel = multiprocessing.cpu_count()

# Loop over all images and bands to calculate zonal stats in parallel...
calc_queue = {}
Expand Down
11 changes: 7 additions & 4 deletions cropclassification/util/zonal_stats_bulk/_zonal_stats_bulk_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def zonal_stats(
stats: Literal["count", "mean", "median", "std", "min", "max"],
cloud_filter_band: Optional[str] = None,
calc_bands_parallel: bool = True,
nb_parallel: int = -1,
force: bool = False,
):
"""
Expand All @@ -57,6 +58,8 @@ def zonal_stats(
temp_dir (Path): _description_
log_dir (Path): _description_
log_level (Union[str, int]): _description_
nb_parallel (int, optional): the number of parallel processes to use.
Defaults to -1: use all available processors.
force (bool, optional): _description_. Defaults to False.
Raises:
Expand All @@ -82,8 +85,8 @@ def zonal_stats(
log_level = logger.level

# Create process pool for parallelisation...
nb_parallel_max = multiprocessing.cpu_count()
nb_parallel = nb_parallel_max
if nb_parallel < 1:
nb_parallel = multiprocessing.cpu_count()

# Loop over all images to start the data preparation for each of them in
# parallel...
Expand All @@ -107,7 +110,7 @@ def zonal_stats(
if (
image_idx < len(rasters_bands)
and len(_filter_on_status(image_dict, "IMAGE_PREPARE_CALC_BUSY"))
< nb_parallel_max
< nb_parallel
):
# Not too many busy preparing, so get next image_path to start
# prepare on
Expand Down Expand Up @@ -184,7 +187,7 @@ def zonal_stats(
temp_dir=temp_dir,
log_dir=log_dir,
log_level=log_level,
nb_parallel_max=nb_parallel_max,
nb_parallel=nb_parallel,
)
image_dict[image_path_str] = {
"features_path": vector_path,
Expand Down

0 comments on commit 3fb94ff

Please sign in to comment.