From c5f7eda2b4d867013abc3651941c838128edab80 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 17 Sep 2024 13:01:58 +0200 Subject: [PATCH] consolidate run_workflow.py --- .../app_interface/run_chunk.py | 26 ++++++++++++- .../pipeline/run.py | 2 +- .../pipeline/run_workflow.py | 37 ------------------- 3 files changed, 26 insertions(+), 39 deletions(-) delete mode 100644 src/depiction_targeted_preproc/pipeline/run_workflow.py diff --git a/src/depiction_targeted_preproc/app_interface/run_chunk.py b/src/depiction_targeted_preproc/app_interface/run_chunk.py index aafd5f6..367e425 100644 --- a/src/depiction_targeted_preproc/app_interface/run_chunk.py +++ b/src/depiction_targeted_preproc/app_interface/run_chunk.py @@ -1,9 +1,12 @@ +import zipfile from pathlib import Path import cyclopts import yaml -from depiction_targeted_preproc.pipeline.run_workflow import run_workflow +from depiction_targeted_preproc.pipeline.prepare_params import Params +from depiction_targeted_preproc.pipeline_config.artifacts_mapping import get_result_files_new +from depiction_targeted_preproc.workflow.snakemake_invoke import SnakemakeInvoke app = cyclopts.App() @@ -23,5 +26,26 @@ def run_chunk(chunk_dir: Path): yaml.safe_dump(result, f) +def run_workflow(sample_dir: Path) -> Path: + # TODO to be refactored + params = Params.model_validate(yaml.safe_load((sample_dir / "params.yml").read_text())) + result_files = get_result_files_new(requested_artifacts=params.requested_artifacts, sample_dir=sample_dir) + + # invoke snakemake + # TODO note report file is deactivated because it's currently broken due to dependencies (jinja2) + SnakemakeInvoke(report_file=None).invoke(work_dir=sample_dir.parent, result_files=result_files) + + # zip the results + sample_name = sample_dir.name + output_dir = sample_dir / "outputs" + output_dir.mkdir(exist_ok=True) + zip_file_path = output_dir / f"{sample_name}.zip" + with zipfile.ZipFile(zip_file_path, "w") as zip_file: + for result_file in result_files: + zip_entry_path = result_file.relative_to(sample_dir.parent) + zip_file.write(result_file, arcname=zip_entry_path) + return zip_file_path + + if __name__ == "__main__": app() diff --git a/src/depiction_targeted_preproc/pipeline/run.py b/src/depiction_targeted_preproc/pipeline/run.py index bcf2ba5..6ec7bd9 100644 --- a/src/depiction_targeted_preproc/pipeline/run.py +++ b/src/depiction_targeted_preproc/pipeline/run.py @@ -6,7 +6,7 @@ from depiction_targeted_preproc.pipeline.prepare_inputs import prepare_inputs from depiction_targeted_preproc.pipeline.prepare_params import prepare_params -from depiction_targeted_preproc.pipeline.run_workflow import run_workflow +from depiction_targeted_preproc.app_interface.run_chunk import run_workflow from depiction_targeted_preproc.pipeline.store_outputs import store_outputs app = cyclopts.App() diff --git a/src/depiction_targeted_preproc/pipeline/run_workflow.py b/src/depiction_targeted_preproc/pipeline/run_workflow.py deleted file mode 100644 index 45ab414..0000000 --- a/src/depiction_targeted_preproc/pipeline/run_workflow.py +++ /dev/null @@ -1,37 +0,0 @@ -import zipfile -from pathlib import Path - -import cyclopts -import yaml - -from depiction_targeted_preproc.pipeline.prepare_params import Params -from depiction_targeted_preproc.pipeline_config.artifacts_mapping import get_result_files_new -from depiction_targeted_preproc.workflow.snakemake_invoke import SnakemakeInvoke - -app = cyclopts.App() - - -@app.default -def run_workflow(sample_dir: Path) -> Path: - # TODO to be refactored - params = Params.model_validate(yaml.safe_load((sample_dir / "params.yml").read_text())) - result_files = get_result_files_new(requested_artifacts=params.requested_artifacts, sample_dir=sample_dir) - - # invoke snakemake - # TODO note report file is deactivated because it's currently broken due to dependencies (jinja2) - SnakemakeInvoke(report_file=None).invoke(work_dir=sample_dir.parent, result_files=result_files) - - # zip the results - sample_name = sample_dir.name - output_dir = sample_dir / "outputs" - output_dir.mkdir(exist_ok=True) - zip_file_path = output_dir / f"{sample_name}.zip" - with zipfile.ZipFile(zip_file_path, "w") as zip_file: - for result_file in result_files: - zip_entry_path = result_file.relative_to(sample_dir.parent) - zip_file.write(result_file, arcname=zip_entry_path) - return zip_file_path - - -if __name__ == "__main__": - app()