From 9e7c596b8e086797086629cfd9ea23b783f29bca Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 17 Sep 2024 13:04:48 +0200 Subject: [PATCH] consolidate run.py and run_batch.py --- .../pipeline/run.py | 89 ------------------- .../pipeline/run_batch.py | 60 ------------- 2 files changed, 149 deletions(-) delete mode 100644 src/depiction_targeted_preproc/pipeline/run.py delete mode 100644 src/depiction_targeted_preproc/pipeline/run_batch.py diff --git a/src/depiction_targeted_preproc/pipeline/run.py b/src/depiction_targeted_preproc/pipeline/run.py deleted file mode 100644 index 6ec7bd9..0000000 --- a/src/depiction_targeted_preproc/pipeline/run.py +++ /dev/null @@ -1,89 +0,0 @@ -from pathlib import Path - -import cyclopts -from bfabric import Bfabric -from bfabric.entities import Workunit, Resource - -from depiction_targeted_preproc.pipeline.prepare_inputs import prepare_inputs -from depiction_targeted_preproc.pipeline.prepare_params import prepare_params -from depiction_targeted_preproc.app_interface.run_chunk import run_workflow -from depiction_targeted_preproc.pipeline.store_outputs import store_outputs - -app = cyclopts.App() - - -def _get_resource_flow_ids(client: Bfabric, workunit_id: int) -> tuple[int, int, str]: - workunit = Workunit.find(id=workunit_id, client=client) - dataset_id = workunit.parameter_values["mass_list_id"] - imzml_resources = [r for r in workunit.input_resources if r["name"].endswith(".imzML")] - if len(imzml_resources) != 1: - raise ValueError(f"Expected exactly one .imzML resource, found {len(imzml_resources)}") - imzml_resource_id = imzml_resources[0].id - sample_name = Path(imzml_resources[0]["name"]).stem - return dataset_id, imzml_resource_id, sample_name - - -def run_one_job( - client: Bfabric, - work_dir: Path, - sample_name: str, - dataset_id: int, - workunit_id: int, - imzml_resource_id: int, - ssh_user: str | None, - read_only: bool, - prepare_only: bool = False, - override_params: bool = False, -) -> None: - sample_dir = work_dir / sample_name - prepare_params(client=client, sample_dir=sample_dir, workunit_id=workunit_id, override=override_params) - prepare_inputs( - client=client, - sample_dir=sample_dir, - dataset_id=dataset_id, - imzml_resource_id=imzml_resource_id, - ssh_user=ssh_user, - ) - if prepare_only: - return - zip_file_path = run_workflow(sample_dir=sample_dir) - if not read_only: - store_outputs(client=client, zip_file_path=zip_file_path, workunit_id=workunit_id, ssh_user=ssh_user) - - -@app.default() -def run_resource_flow(workunit_id: int, work_dir: Path, ssh_user: str | None = None, read_only: bool = False) -> None: - client = Bfabric.from_config() - if not read_only: - set_workunit_processing(client=client, workunit_id=workunit_id) - dataset_id, imzml_resource_id, sample_name = _get_resource_flow_ids(client=client, workunit_id=workunit_id) - run_one_job( - client=client, - work_dir=work_dir, - sample_name=sample_name, - dataset_id=dataset_id, - workunit_id=workunit_id, - imzml_resource_id=imzml_resource_id, - ssh_user=ssh_user, - read_only=read_only, - ) - if not read_only: - set_workunit_available(client=client, workunit_id=workunit_id) - - -def set_workunit_processing(client: Bfabric, workunit_id: int) -> None: - """Sets the workunit to processing and deletes the default resource if it is available.""" - client.save("workunit", {"id": workunit_id, "status": "processing"}) - # TODO the default resource should be deleted in the future, but right now we simply set it to 0 and available - # (it will not be possible before updated wrapper creator) - resources = Resource.find_by({"name": "% - resource", "workunitid": workunit_id}, client=client, max_results=1) - if resources: - client.save("resource", {"id": list(resources.values())[0].id, "status": "available"}) - - -def set_workunit_available(client: Bfabric, workunit_id: int) -> None: - client.save("workunit", {"id": workunit_id, "status": "available"}) - - -if __name__ == "__main__": - app() diff --git a/src/depiction_targeted_preproc/pipeline/run_batch.py b/src/depiction_targeted_preproc/pipeline/run_batch.py deleted file mode 100644 index 14d4d79..0000000 --- a/src/depiction_targeted_preproc/pipeline/run_batch.py +++ /dev/null @@ -1,60 +0,0 @@ -from pathlib import Path - -import cyclopts -from bfabric import Bfabric -from bfabric.entities import Workunit - -from depiction_targeted_preproc.pipeline.run import set_workunit_processing, set_workunit_available, run_one_job -from depiction_targeted_preprocbatch.batch_dataset import BatchDataset - -app = cyclopts.App() - - -@app.default() -def run_batch(workunit_id: int, work_dir: Path, ssh_user: str | None = None, read_only: bool = False) -> None: - client = Bfabric.from_config() - if not read_only: - set_workunit_processing(client=client, workunit_id=workunit_id) - - workunit = Workunit.find(id=workunit_id, client=client) - batch_dataset = BatchDataset(dataset_id=workunit.input_dataset.id, client=client) - - # TODO there is currently a serious bug which prevents the parallelization here, but this would be the place to - # implement it - for job in batch_dataset.jobs: - run_one_job( - client=client, - work_dir=work_dir, - sample_name=str(Path(job.imzml["name"]).stem), - dataset_id=job.panel.id, - workunit_id=workunit_id, - imzml_resource_id=job.imzml.id, - ssh_user=ssh_user, - read_only=read_only, - ) - - if not read_only: - set_workunit_available(client=client, workunit_id=workunit_id) - - -@app.command -def prepare(workunit_id: int, work_dir: Path, ssh_user: str | None = None): - client = Bfabric.from_config() - workunit = Workunit.find(id=workunit_id, client=client) - batch_dataset = BatchDataset(dataset_id=workunit.input_dataset.id, client=client) - for job in batch_dataset.jobs: - run_one_job( - client=client, - work_dir=work_dir, - sample_name=str(Path(job.imzml["name"]).stem), - dataset_id=job.panel.id, - workunit_id=workunit_id, - imzml_resource_id=job.imzml.id, - ssh_user=ssh_user, - read_only=True, - prepare_only=True, - ) - - -if __name__ == "__main__": - app()