diff --git a/src/depiction_targeted_preproc/pipeline/run.py b/src/depiction_targeted_preproc/pipeline/run.py index 8375047..331ae5d 100644 --- a/src/depiction_targeted_preproc/pipeline/run.py +++ b/src/depiction_targeted_preproc/pipeline/run.py @@ -71,7 +71,7 @@ def run_one_job( @app.default() def run_resource_flow(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None: client = Bfabric.from_config() - _set_workunit_processing(client=client, workunit_id=workunit_id) + set_workunit_processing(client=client, workunit_id=workunit_id) dataset_id, imzml_resource_id, sample_name = _get_resource_flow_ids(client=client, workunit_id=workunit_id) run_one_job( client=client, @@ -82,15 +82,10 @@ def run_resource_flow(workunit_id: int, work_dir: Path, ssh_user: str | None = N imzml_resource_id=imzml_resource_id, ssh_user=ssh_user, ) - _set_workunit_available(client=client, workunit_id=workunit_id) + set_workunit_available(client=client, workunit_id=workunit_id) -def run_batch(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None: - # TODO maybe it will make sense to still implement this is in a dedicated file - pass - - -def _set_workunit_processing(client: Bfabric, workunit_id: int) -> None: +def set_workunit_processing(client: Bfabric, workunit_id: int) -> None: """Sets the workunit to processing and deletes the default resource if it is available.""" client.save("workunit", {"id": workunit_id, "status": "processing"}) # TODO the default resource should be deleted in the future, but right now we simply set it to 0 and available @@ -100,7 +95,7 @@ def _set_workunit_processing(client: Bfabric, workunit_id: int) -> None: client.save("resource", {"id": list(resources.values())[0].id, "status": "available"}) -def _set_workunit_available(client: Bfabric, workunit_id: int) -> None: +def set_workunit_available(client: Bfabric, workunit_id: int) -> None: client.save("workunit", {"id": workunit_id, "status": "available"}) diff --git a/src/depiction_targeted_preproc/pipeline/run_batch.py b/src/depiction_targeted_preproc/pipeline/run_batch.py new file mode 100644 index 0000000..8c5fd40 --- /dev/null +++ b/src/depiction_targeted_preproc/pipeline/run_batch.py @@ -0,0 +1,35 @@ +from pathlib import Path + +import cyclopts +from bfabric import Bfabric +from bfabric.entities import Workunit +from depiction_targeted_preproc.pipeline.run import set_workunit_processing, set_workunit_available, run_one_job +from depiction_targeted_preprocbatch.batch_dataset import BatchDataset + +app = cyclopts.App() + + +@app.default() +def run_batch(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None: + client = Bfabric.from_config() + set_workunit_processing(client=client, workunit_id=workunit_id) + + workunit = Workunit.find(id=workunit_id, client=client) + batch_dataset = BatchDataset(dataset_id=workunit.input_dataset.id, client=client) + + for job in batch_dataset.jobs: + run_one_job( + client=client, + work_dir=work_dir, + sample_name=str(Path(job.imzml["name"]).stem), + dataset_id=job.panel.id, + workunit_id=workunit_id, + imzml_resource_id=job.imzml.id, + ssh_user=ssh_user, + ) + + set_workunit_available(client=client, workunit_id=workunit_id) + + +if __name__ == "__main__": + app() diff --git a/src/depiction_targeted_preprocbatch/executor.py b/src/depiction_targeted_preprocbatch/executor.py index fa32086..5fd7421 100644 --- a/src/depiction_targeted_preprocbatch/executor.py +++ b/src/depiction_targeted_preprocbatch/executor.py @@ -1,22 +1,11 @@ from __future__ import annotations -import multiprocessing from dataclasses import dataclass from pathlib import Path -import cyclopts -from bfabric import Bfabric -from bfabric.entities import Storage, Resource +from bfabric.entities import Resource from depiction_targeted_preproc.app.workunit_config import WorkunitConfig -from depiction_targeted_preproc.pipeline_config.artifacts_mapping import ( - get_result_files, -) from depiction_targeted_preproc.pipeline_config.model import PipelineParameters -from depiction_targeted_preproc.workflow.snakemake_invoke import SnakemakeInvoke -from depiction_targeted_preprocbatch.batch_dataset import BatchDataset -from depiction_targeted_preprocbatch.job_export_results import JobExportResults -from depiction_targeted_preprocbatch.job_prepare_inputs import JobPrepareInputs -from loguru import logger @dataclass @@ -38,126 +27,3 @@ def from_bfabric(cls, imzml_resource: Resource, workunit_config: WorkunitConfig, sample_name=Path(imzml_resource["name"]).stem, ssh_user=ssh_user, ) - - -class Executor: - """Executes the pipeline for multiple files, supporting parallel, but isolated, execution of multiple jobs. - Results will be gradually published once they become available. - """ - - def __init__( - self, - proc_dir: Path, - output_dir: Path, - workunit_config: WorkunitConfig, - client: Bfabric, - force_ssh_user: str | None = None, - ) -> None: - self._client = client - self._workunit_config = workunit_config - self._force_ssh_user = force_ssh_user - self.proc_dir = proc_dir - self.output_dir = output_dir - - def run(self, n_jobs: int) -> None: - """Runs all jobs, executing up to `n_jobs` in parallel.""" - self._set_workunit_processing() - batch_dataset = BatchDataset(dataset_id=self._workunit_config.input_dataset_id, client=self._client) - - jobs = [ - BatchJob.from_bfabric( - imzml_resource=job.imzml, workunit_config=self._workunit_config, ssh_user=self._force_ssh_user - ) - for job in batch_dataset.jobs - ] - # TODO parallelization is currently broken - if n_jobs != 1: - logger.error("Parallelization is currently broken and will be disabled.") - for job in jobs: - self.run_job(job) - # parallel = joblib.Parallel(n_jobs=n_jobs, verbose=10) - # parallel(joblib.delayed(self.run_job)(job) for job in jobs) - self._set_workunit_available() - - def run_job(self, job: BatchJob) -> None: - """Runs a single job.""" - workflow_dir = self.proc_dir / job.sample_name - sample_dir = workflow_dir / job.sample_name - sample_dir.mkdir(parents=True, exist_ok=True) - - # stage input - logger.debug(f"Preparing inputs for {job}") - JobPrepareInputs.prepare(job=job, sample_dir=sample_dir, client=self._client) - - # invoke the pipeline - logger.debug(f"Running pipeline for {job}") - result_files = self._determine_result_files(job=job, workflow_dir=workflow_dir) - SnakemakeInvoke().invoke(work_dir=workflow_dir, result_files=result_files) - - # export the results - logger.debug(f"Exporting results for {job}") - # TODO do not hardcode id - output_storage = Storage.find(id=2, client=self._client) - JobExportResults.export( - client=self._client, - work_dir=workflow_dir, - workunit_config=self._workunit_config, - sample_name=sample_dir.name, - result_files=result_files, - output_storage=output_storage, - force_ssh_user=self._force_ssh_user, - ) - - def _determine_result_files(self, job: BatchJob, workflow_dir: Path) -> list[Path]: - """Returns the requested result files based on the pipeline parameters for a particular job.""" - return get_result_files(params=job.pipeline_parameters, work_dir=workflow_dir, sample_name=job.sample_name) - - def _set_workunit_processing(self) -> None: - """Sets the workunit to processing and deletes the default resource if it is available.""" - self._client.save( - "workunit", - { - "id": self._workunit_config.workunit_id, - "status": "processing", - }, - ) - JobExportResults.delete_default_resource(workunit_id=self._workunit_config.workunit_id, client=self._client) - - def _set_workunit_available(self) -> None: - # TODO not clear if it needs to be addressed here or in the shell script - self._client.save( - "workunit", - { - "id": self._workunit_config.workunit_id, - "status": "available", - }, - ) - - -app = cyclopts.App() - - -@app.default -def process_app( - proc_dir: Path, - output_dir: Path, - workunit_yaml: Path, - n_jobs: int = 32, - force_ssh_user: str | None = None, -) -> None: - """Runs the executor.""" - workunit_config = WorkunitConfig.from_yaml(workunit_yaml) - client = Bfabric.from_config() - executor = Executor( - proc_dir=proc_dir, - output_dir=output_dir, - workunit_config=workunit_config, - client=client, - force_ssh_user=force_ssh_user, - ) - executor.run(n_jobs=n_jobs) - - -if __name__ == "__main__": - multiprocessing.freeze_support() - app() diff --git a/src/depiction_targeted_preprocbatch/job_export_results.py b/src/depiction_targeted_preprocbatch/job_export_results.py deleted file mode 100644 index 7d68f0a..0000000 --- a/src/depiction_targeted_preprocbatch/job_export_results.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import annotations - -import zipfile -from pathlib import Path - -import yaml -from bfabric import Bfabric -from bfabric.entities import Storage, Resource -from bfabric.experimental.app_interface.output_registration import register_outputs -from depiction_targeted_preproc.app.workunit_config import WorkunitConfig -from loguru import logger - - -class JobExportResults: - """Exports the results of the job to the output storage and registers it in B-Fabric.""" - - def __init__( - self, - client: Bfabric, - work_dir: Path, - workunit_config: WorkunitConfig, - output_storage: Storage, - sample_name: str, - force_ssh_user: str | None = None, - ) -> None: - self._client = client - self._workunit_config = workunit_config - self.output_dir = work_dir / "output" - self._output_storage = output_storage - self._sample_name = sample_name - self._force_ssh_user = force_ssh_user - - @property - def _workunit_id(self) -> int: - return self._workunit_config.workunit_id - - @classmethod - def export( - cls, - client: Bfabric, - work_dir: Path, - workunit_config: WorkunitConfig, - sample_name: str, - result_files: list[Path], - output_storage: Storage, - force_ssh_user: str | None, - ) -> None: - """Exports the results of one job.""" - instance = cls( - client=client, - work_dir=work_dir, - workunit_config=workunit_config, - output_storage=output_storage, - sample_name=sample_name, - force_ssh_user=force_ssh_user, - ) - instance.export_results(result_files) - - def export_results(self, result_files: list[Path]) -> None: - """Exports the results of one job.""" - self._create_zip_file(result_files) - self._register_result() - - def delete_local(self): - # TODO this functionality will be needed when processing large jobs - raise NotImplementedError - - @property - def _zip_file_path(self) -> Path: - return self.output_dir / f"{self._sample_name}.zip" - - def _create_zip_file(self, result_files: list[Path]) -> None: - """Creates a ZIP file containing the results for one sample.""" - self.output_dir.mkdir(exist_ok=True, parents=True) - with zipfile.ZipFile(self._zip_file_path, "w") as zip_file: - for result_file in result_files: - zip_entry_path = result_file.relative_to(self.output_dir.parent) - zip_file.write(result_file, arcname=zip_entry_path) - - @property - def _outputs_spec(self) -> dict[str, list[dict[str, str | int]]]: - return { - "outputs": [ - { - "type": "bfabric_copy_resource", - "local_path": str(self._zip_file_path), - "store_entry_path": self._zip_file_path.name, - "workunit_id": self._workunit_id, - "storage_id": self._output_storage.id, - } - ] - } - - def _register_result(self) -> None: - outputs_yaml = self.output_dir / f"{self._sample_name}_outputs_spec.yml" - with outputs_yaml.open("w") as file: - yaml.safe_dump(self._outputs_spec, file) - register_outputs( - outputs_yaml=outputs_yaml, - client=self._client, - ssh_user=self._force_ssh_user, - ) - - @staticmethod - def delete_default_resource(workunit_id: int, client: Bfabric) -> bool: - """Deletes the default resource created by the wrapper creator if it exists. Returns true if the resource was - successfully deleted. - """ - logger.warning( - "Currently, the wrapper creator has a limitation that makes it impossible to remove " - "this resource. This will be addressed in the future." - ) - if False: - resources = Resource.find_by( - {"name": "MSI_Targeted_PreprocBatch 0 - resource", "workunitid": workunit_id}, client=client - ) - if len(resources) == 1: - resource_id = list(resources.values())[0].id - logger.info(f"Deleting default resource with ID {resource_id}") - result = client.delete("resource", resource_id, check=False) - return result.is_success - elif len(resources) > 1: - raise ValueError("There should never be more than one default resource.") - else: - return False diff --git a/src/depiction_targeted_preprocbatch/job_prepare_inputs.py b/src/depiction_targeted_preprocbatch/job_prepare_inputs.py deleted file mode 100644 index 8313e2f..0000000 --- a/src/depiction_targeted_preprocbatch/job_prepare_inputs.py +++ /dev/null @@ -1,107 +0,0 @@ -from __future__ import annotations - -from functools import cached_property -from pathlib import Path -from typing import TYPE_CHECKING - -import yaml -from bfabric import Bfabric -from bfabric.entities import Resource -from bfabric.experimental.app_interface.input_preparation import prepare_folder -from depiction_targeted_preproc.pipeline.setup_old import copy_standardized_table -from loguru import logger - -if TYPE_CHECKING: - from depiction_targeted_preprocbatch.executor import BatchJob - - -class JobPrepareInputs: - """Prepares the inputs for a particular job. - :param job: The job to prepare the inputs for. - :param sample_dir: The directory where the inputs should be staged. - """ - - def __init__(self, job: BatchJob, sample_dir: Path, client: Bfabric) -> None: - self._job = job - self._sample_dir = sample_dir - self._client = client - self._dataset_id = job.dataset_id - self._imzml_resource_id = job.imzml_resource_id - self._ssh_user = job.ssh_user - - @classmethod - def prepare(cls, job: BatchJob, sample_dir: Path, client: Bfabric) -> None: - """Prepares the inputs for a particular job. - :param job: The job to prepare the inputs for. - :param sample_dir: The directory where the inputs should be staged. - :param client: The Bfabric client to use. - """ - instance = cls(job=job, sample_dir=sample_dir, client=client) - instance.stage_all() - - def stage_all(self) -> None: - """Stages all required input files for a particular job.""" - self.stage_bfabric_inputs() - self._standardize_input_table() - self.stage_pipeline_parameters() - - def _standardize_input_table(self): - input_path = self._sample_dir / "mass_list.unstandardized.raw.csv" - output_path = self._sample_dir / "mass_list.raw.csv" - copy_standardized_table(input_path, output_path) - - @property - def _inputs_spec(self) -> dict[str, list[dict[str, str | int | bool]]]: - return { - "inputs": [ - { - "type": "bfabric_dataset", - "id": self._dataset_id, - "filename": "mass_list.unstandardized.raw.csv", - "separator": ",", - }, - { - "type": "bfabric_resource", - "id": self._imzml_resource_id, - "filename": "raw.imzML", - "check_checksum": True, - }, - { - "type": "bfabric_resource", - "id": self._ibd_resource_id, - "filename": "raw.ibd", - "check_checksum": True, - }, - ] - } - - def stage_bfabric_inputs(self) -> None: - inputs_yaml = self._sample_dir / "inputs_spec.yml" - with inputs_yaml.open("w") as file: - yaml.safe_dump(self._inputs_spec, file) - prepare_folder( - inputs_yaml=inputs_yaml, target_folder=self._sample_dir, client=self._client, ssh_user=self._ssh_user - ) - - @cached_property - def _ibd_resource_id(self) -> int: - imzml_resource = Resource.find(id=self._imzml_resource_id, client=self._client) - if imzml_resource["name"].endswith(".imzML"): - expected_name = imzml_resource["name"][:-6] + ".ibd" - results = self._client.read( - "resource", - {"name": expected_name, "containerid": imzml_resource["container"]["id"]}, - max_results=1, - return_id_only=True, - ) - return results[0]["id"] - else: - # TODO this will have to be refactored later - raise NotImplementedError("Only .imzML files are supported for now") - - def stage_pipeline_parameters(self) -> None: - """Copies the `pipeline_params.yml` file to the particular sample's directory.""" - output_path = self._sample_dir / "pipeline_params.yml" - logger.debug(f"Staging pipeline parameters to {self._sample_dir}") - with output_path.open("w") as file: - yaml.dump(self._job.pipeline_parameters.model_dump(mode="json"), file) diff --git a/src/depiction_targeted_preprocbatch/scp_util.py b/src/depiction_targeted_preprocbatch/scp_util.py deleted file mode 100644 index 1338934..0000000 --- a/src/depiction_targeted_preprocbatch/scp_util.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import annotations - -import subprocess -from pathlib import Path - -from loguru import logger - - -def _is_remote(path: str | Path) -> bool: - return ":" in str(path) - - -def scp(source: str | Path, target: str | Path, *, username: str | None = None) -> None: - """Performs scp source target. - Make sure that either the source or target specifies a host, otherwise you should just use shutil.copyfile. - """ - source_remote = _is_remote(source) - target_remote = _is_remote(target) - if source_remote == target_remote: - msg = f"Either source or target should be remote, but not both {source_remote=} == {target_remote=}" - raise ValueError(msg) - if username and source_remote: - source = f"{username}@{source}" - elif username and target_remote: - target = f"{username}@{target}" - - logger.info(f"scp {source} {target}") - subprocess.run(["scp", source, target], check=True)