Skip to content

Commit

Permalink
Add single reconstruction option (#89)
Browse files Browse the repository at this point in the history
This allows more specific processing path definition for single
reconstructions, along with custom multiprocessing pool definition.

Depends on #88
  • Loading branch information
thomasmfish authored Nov 6, 2024
2 parents 1fba847 + 084eb61 commit 15adb23
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 32 deletions.
9 changes: 7 additions & 2 deletions src/sim_recon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@


if __name__ == "__main__":
from .main import sim_reconstruct, sim_psf_to_otf
from .main import sim_reconstruct, sim_psf_to_otf, sim_reconstruct_single

__all__ = ["__version__", "sim_reconstruct", "sim_psf_to_otf"]
__all__ = [
"__version__",
"sim_reconstruct",
"sim_reconstruct_single",
"sim_psf_to_otf",
]
else:
__all__ = ["__version__"]
70 changes: 67 additions & 3 deletions src/sim_recon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
)
from .settings import ConfigManager
from .otfs import convert_psfs_to_otfs
from .recon import run_reconstructions
from .recon import run_reconstructions, run_single_reconstruction


if TYPE_CHECKING:
from typing import Any
from os import PathLike
from pathlib import Path
from multiprocessing.pool import Pool
from .recon import OutputFileTypes


Expand Down Expand Up @@ -101,9 +102,9 @@ def sim_reconstruct(
overwrite: bool = False,
cleanup: bool = True,
stitch_channels: bool = True,
parallel_process: bool = False,
allow_missing_channels: bool = False,
output_file_type: OutputFileTypes = "dv",
parallel_process: bool = False,
**recon_kwargs: Any,
) -> None:
"""
Expand All @@ -118,7 +119,7 @@ def sim_reconstruct(
output_directory : str | PathLike[str] | None, optional
Directory to save reconstructions in (reconstructions will be saved with the data files if not specified), by default None
processing_directory : str | PathLike[str] | None, optional
The directory in which the temporary files will be stored for processing (otherwise the output directory will be used), by default None
The directory in which subdirectories of temporary files will be stored for processing (otherwise the output directory will be used), by default None
otf_overrides : dict[int, Path] | None, optional
A dictionary with emission wavelengths in nm as keys and paths to OTF files as values (these override configured OTFs), by default None
overwrite : bool, optional
Expand All @@ -144,8 +145,71 @@ def sim_reconstruct(
overwrite=overwrite,
cleanup=cleanup,
stitch_channels=stitch_channels,
allow_partial=allow_missing_channels,
output_file_type=output_file_type,
parallel_process=parallel_process,
**recon_kwargs,
)


def sim_reconstruct_single(
sim_data_path: str | PathLike[str],
config_path: str | PathLike[str] | None = None,
output_directory: str | PathLike[str] | None = None,
processing_directory: str | PathLike[str] | None = None,
otf_overrides: dict[int, Path] | None = None,
overwrite: bool = False,
cleanup: bool = True,
stitch_channels: bool = True,
allow_missing_channels: bool = False,
output_file_type: OutputFileTypes = "dv",
multiprocessing_pool: Pool | None = None,
parallel_process: bool = False,
**recon_kwargs: Any,
) -> None:
"""
Top level function for reconstructing SIM data
Parameters
----------
sim_data_path : str | PathLike[str]
Path to SIM data files (DV expected)
config_path : str | PathLike[str] | None, optional
Path of the top level config file, by default None
output_directory : str | PathLike[str] | None, optional
Directory to save reconstructions in (reconstructions will be saved with the data files if not specified), by default None
processing_directory : str | PathLike[str] | None, optional
The directory in which the temporary files will be stored for processing (otherwise the output directory will be used), by default None
otf_overrides : dict[int, Path] | None, optional
A dictionary with emission wavelengths in nm as keys and paths to OTF files as values (these override configured OTFs), by default None
overwrite : bool, optional
Overwrite files if they already exist, by default False
cleanup : bool, optional
Clean up temporary directory and files after reconstruction, by default True
stitch_channels : bool, optional
Stitch channels back together after processing (otherwise output will be a separate DV per channel), by default True
allow_missing_channels: bool, optional
Attempt reconstruction of other channels in a multi-channel file if one or more are not configured, by default False
output_file_type: Literal["dv", "tiff"], optional
File type that output images will be saved as, by default "dv"
multiprocessing_pool : Pool | None, optional
Multiprocessing pool to run cudasirecon in (`maxtasksperchild=1` is recommended to avoid crashes), by default None
parallel_process : bool, optional
Run reconstructions in 2 processes concurrently (ignored if multiprocessing_pool is supplied), by default False
"""
conf = load_configs(config_path, otf_overrides=otf_overrides)
logger.info("Starting reconstruction of %s", sim_data_path)
run_single_reconstruction(
conf,
sim_data_path,
output_directory=output_directory,
processing_directory=processing_directory,
overwrite=overwrite,
cleanup=cleanup,
stitch_channels=stitch_channels,
allow_partial=allow_missing_channels,
output_file_type=output_file_type,
multiprocessing_pool=multiprocessing_pool,
parallel_process=parallel_process,
**recon_kwargs,
)
125 changes: 98 additions & 27 deletions src/sim_recon/recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
if TYPE_CHECKING:
from typing import Any, Literal, TypeAlias
from os import PathLike
from multiprocessing.pool import AsyncResult
from multiprocessing.pool import AsyncResult, Pool
from numpy.typing import NDArray

OutputFileTypes: TypeAlias = Literal["dv", "tiff"]
Expand Down Expand Up @@ -364,37 +364,39 @@ def _get_incomplete_channels(
return incomplete_wavelengths


def run_reconstructions(
def run_single_reconstruction(
conf: ConfigManager,
*sim_data_paths: str | PathLike[str],
sim_data_path: str | PathLike[str],
*,
output_directory: str | PathLike[str] | None,
processing_directory: str | PathLike[str] | None = None,
overwrite: bool = False,
cleanup: bool = False,
stitch_channels: bool = True,
parallel_process: bool = False,
allow_missing_channels: bool = False,
output_file_type: OutputFileTypes = "dv",
multiprocessing_pool: Pool | None = None,
parallel_process: bool = False,
**config_kwargs: Any,
) -> None:

logging_redirect = get_logging_redirect()
progress_wrapper = get_progress_wrapper()

# `maxtasksperchild=1` is necessary to ensure the child process is cleaned
# up between tasks, as the cudasirecon process doesn't fully release memory
# afterwards
with (
multiprocessing.Pool(
processes=1 + int(parallel_process), # 2 processes max
if multiprocessing_pool is None:
# `maxtasksperchild=1` is necessary to ensure the child process is cleaned
# up between tasks, as the cudasirecon process doesn't fully release memory
# afterwards
pool = multiprocessing.Pool(
processes=2 if parallel_process else 1, # 2 processes max
maxtasksperchild=1,
) as pool,
logging_redirect(),
delete_directory_if_empty(processing_directory),
):
for sim_data_path in progress_wrapper(
sim_data_paths, desc="SIM data files", unit="file"
):
)
else:
pool = multiprocessing_pool

try:

logging_redirect = get_logging_redirect()
progress_wrapper = get_progress_wrapper()

with logging_redirect(), delete_directory_if_empty(processing_directory):
output_paths: tuple[Path, ...] | None = None
try:
sim_data_path = Path(sim_data_path)
Expand All @@ -407,14 +409,23 @@ def run_reconstructions(
raise PySimReconFileNotFoundError(
f"Image file {sim_data_path} does not exist"
)

named_temp_dir_kwargs: dict[str, Any]
if processing_directory is None:
named_temp_dir_kwargs = {
"name": sim_data_path.stem,
"parents": False,
"directory": file_output_directory,
}
else:
processing_directory = Path(processing_directory)
named_temp_dir_kwargs = {
"name": processing_directory.name,
"parents": True,
"directory": processing_directory.parent,
}
with NamedTemporaryDirectory(
name=sim_data_path.stem,
parents=True,
directory=(
file_output_directory
if processing_directory is None
else processing_directory
),
**named_temp_dir_kwargs,
delete=cleanup,
) as proc_dir:
proc_dir = Path(proc_dir)
Expand Down Expand Up @@ -525,6 +536,66 @@ def run_reconstructions(
logger.error(
"Unexpected error occurred for %s", sim_data_path, exc_info=True
)
finally:
if multiprocessing_pool is None:
# Only close pools that were created by this function
pool.close()


def run_reconstructions(
conf: ConfigManager,
*sim_data_paths: str | PathLike[str],
output_directory: str | PathLike[str] | None,
processing_directory: str | PathLike[str] | None = None,
overwrite: bool = False,
cleanup: bool = False,
stitch_channels: bool = True,
allow_missing_channels: bool = False,
output_file_type: OutputFileTypes = "dv",
parallel_process: bool = False,
**config_kwargs: Any,
) -> None:

logging_redirect = get_logging_redirect()
progress_wrapper = get_progress_wrapper()

# `maxtasksperchild=1` is necessary to ensure the child process is cleaned
# up between tasks, as the cudasirecon process doesn't fully release memory
# afterwards
with (
multiprocessing.Pool(
processes=2 if parallel_process else 1, # 2 processes max
maxtasksperchild=1,
) as pool,
logging_redirect(),
delete_directory_if_empty(processing_directory),
):
for sim_data_path in progress_wrapper(
sim_data_paths, desc="SIM data files", unit="file"
):
sim_data_path = Path(sim_data_path)

if processing_directory is None:
proc_dir = None
else:
# For multiple reconstructions sharing the same processing directory
# Use subdirectories from the data path stem
proc_dir = Path(processing_directory) / sim_data_path.stem

run_single_reconstruction(
conf,
sim_data_path,
output_directory=output_directory,
processing_directory=proc_dir,
overwrite=overwrite,
cleanup=cleanup,
stitch_channels=stitch_channels,
allow_missing_channels=allow_missing_channels,
output_file_type=output_file_type,
multiprocessing_pool=pool,
parallel_process=parallel_process,
**config_kwargs,
)


def _prepare_config_kwargs(
Expand Down

0 comments on commit 15adb23

Please sign in to comment.