diff --git a/src/depiction_targeted_preproc/pipeline/prepare_inputs.py b/src/depiction_targeted_preproc/pipeline/prepare_inputs.py index 752435f..0bad7a9 100644 --- a/src/depiction_targeted_preproc/pipeline/prepare_inputs.py +++ b/src/depiction_targeted_preproc/pipeline/prepare_inputs.py @@ -64,19 +64,9 @@ def write_inputs_spec(dataset_id: int, imzml_resource_id: int, client: Bfabric, def prepare_inputs( client: Bfabric, sample_dir: Path, - workunit_id: int, + dataset_id: int, + imzml_resource_id: int, ssh_user: str | None, ) -> None: - dataset_id, imzml_resource_id = _get_resource_flow_ids(client=client, workunit_id=workunit_id) write_inputs_spec(dataset_id=dataset_id, imzml_resource_id=imzml_resource_id, client=client, sample_dir=sample_dir) prepare_folder(inputs_yaml=sample_dir / "inputs.yml", target_folder=sample_dir, client=client, ssh_user=ssh_user) - - -def _get_resource_flow_ids(client: Bfabric, workunit_id: int) -> tuple[int, int]: - workunit = Workunit.find(id=workunit_id, client=client) - dataset_id = workunit.parameter_values["mass_list_id"] - imzml_resources = [r for r in workunit.input_resources if r["name"].endswith(".imzML")] - if len(imzml_resources) != 1: - raise ValueError(f"Expected exactly one .imzML resource, found {len(imzml_resources)}") - imzml_resource_id = imzml_resources[0].id - return dataset_id, imzml_resource_id diff --git a/src/depiction_targeted_preproc/pipeline/run.py b/src/depiction_targeted_preproc/pipeline/run.py index 3c3ebf2..f4eef12 100644 --- a/src/depiction_targeted_preproc/pipeline/run.py +++ b/src/depiction_targeted_preproc/pipeline/run.py @@ -1,18 +1,66 @@ +import zipfile from pathlib import Path import cyclopts +import yaml from bfabric import Bfabric +from bfabric.entities import Workunit from depiction_targeted_preproc.pipeline.prepare_inputs import prepare_inputs -from depiction_targeted_preproc.pipeline.prepare_params import prepare_params +from depiction_targeted_preproc.pipeline.prepare_params import prepare_params, Params +from depiction_targeted_preproc.pipeline.store_outputs import store_outputs +from depiction_targeted_preproc.pipeline_config.artifacts_mapping import get_result_files_new +from depiction_targeted_preproc.workflow.snakemake_invoke import SnakemakeInvoke app = cyclopts.App() +def _get_resource_flow_ids(client: Bfabric, workunit_id: int) -> tuple[int, int, str]: + workunit = Workunit.find(id=workunit_id, client=client) + dataset_id = workunit.parameter_values["mass_list_id"] + imzml_resources = [r for r in workunit.input_resources if r["name"].endswith(".imzML")] + if len(imzml_resources) != 1: + raise ValueError(f"Expected exactly one .imzML resource, found {len(imzml_resources)}") + imzml_resource_id = imzml_resources[0].id + sample_name = Path(imzml_resources[0]["name"]).stem + return dataset_id, imzml_resource_id, sample_name + + +def run_workflow(sample_dir: Path) -> Path: + # TODO to be refactored + params = Params.model_validate(yaml.safe_load((sample_dir / "params.yml").read_text())) + result_files = get_result_files_new(requested_artifacts=params.requested_artifacts, sample_dir=sample_dir) + + # invoke snakemake + SnakemakeInvoke().invoke(work_dir=sample_dir.parent, result_files=result_files) + + # zip the results + sample_name = sample_dir.name + output_dir = sample_dir.parent / "output" + output_dir.mkdir(exist_ok=True) + zip_file_path = output_dir / f"{sample_name}.zip" + with zipfile.ZipFile(zip_file_path, "w") as zip_file: + for result_file in result_files: + zip_entry_path = result_file.relative_to(sample_dir.parent) + zip_file.write(result_file, arcname=zip_entry_path) + return zip_file_path + + @app.default() -def run(workunit_id: int, sample_dir: Path, ssh_user: str | None = None) -> None: +def run(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None: client = Bfabric.from_config() + dataset_id, imzml_resource_id, sample_name = _get_resource_flow_ids(client=client, workunit_id=workunit_id) + sample_dir = work_dir / sample_name + prepare_params(client=client, sample_dir=sample_dir, workunit_id=workunit_id) - prepare_inputs(client=client, sample_dir=sample_dir, workunit_id=workunit_id, ssh_user=ssh_user) + prepare_inputs( + client=client, + sample_dir=sample_dir, + dataset_id=dataset_id, + imzml_resource_id=imzml_resource_id, + ssh_user=ssh_user, + ) + zip_file_path = run_workflow(sample_dir=sample_dir) + store_outputs(client=client, zip_file_path=zip_file_path, workunit_id=workunit_id) if __name__ == "__main__":