Skip to content

Commit

Permalink
initial run.py
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Sep 4, 2024
1 parent 621c73e commit 8d07747
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 15 deletions.
14 changes: 2 additions & 12 deletions src/depiction_targeted_preproc/pipeline/prepare_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,9 @@ def write_inputs_spec(dataset_id: int, imzml_resource_id: int, client: Bfabric,
def prepare_inputs(
client: Bfabric,
sample_dir: Path,
workunit_id: int,
dataset_id: int,
imzml_resource_id: int,
ssh_user: str | None,
) -> None:
dataset_id, imzml_resource_id = _get_resource_flow_ids(client=client, workunit_id=workunit_id)
write_inputs_spec(dataset_id=dataset_id, imzml_resource_id=imzml_resource_id, client=client, sample_dir=sample_dir)
prepare_folder(inputs_yaml=sample_dir / "inputs.yml", target_folder=sample_dir, client=client, ssh_user=ssh_user)


def _get_resource_flow_ids(client: Bfabric, workunit_id: int) -> tuple[int, int]:
workunit = Workunit.find(id=workunit_id, client=client)
dataset_id = workunit.parameter_values["mass_list_id"]
imzml_resources = [r for r in workunit.input_resources if r["name"].endswith(".imzML")]
if len(imzml_resources) != 1:
raise ValueError(f"Expected exactly one .imzML resource, found {len(imzml_resources)}")
imzml_resource_id = imzml_resources[0].id
return dataset_id, imzml_resource_id
54 changes: 51 additions & 3 deletions src/depiction_targeted_preproc/pipeline/run.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,66 @@
import zipfile
from pathlib import Path

import cyclopts
import yaml
from bfabric import Bfabric
from bfabric.entities import Workunit
from depiction_targeted_preproc.pipeline.prepare_inputs import prepare_inputs
from depiction_targeted_preproc.pipeline.prepare_params import prepare_params
from depiction_targeted_preproc.pipeline.prepare_params import prepare_params, Params
from depiction_targeted_preproc.pipeline.store_outputs import store_outputs
from depiction_targeted_preproc.pipeline_config.artifacts_mapping import get_result_files_new
from depiction_targeted_preproc.workflow.snakemake_invoke import SnakemakeInvoke

app = cyclopts.App()


def _get_resource_flow_ids(client: Bfabric, workunit_id: int) -> tuple[int, int, str]:
workunit = Workunit.find(id=workunit_id, client=client)
dataset_id = workunit.parameter_values["mass_list_id"]
imzml_resources = [r for r in workunit.input_resources if r["name"].endswith(".imzML")]
if len(imzml_resources) != 1:
raise ValueError(f"Expected exactly one .imzML resource, found {len(imzml_resources)}")
imzml_resource_id = imzml_resources[0].id
sample_name = Path(imzml_resources[0]["name"]).stem
return dataset_id, imzml_resource_id, sample_name


def run_workflow(sample_dir: Path) -> Path:
# TODO to be refactored
params = Params.model_validate(yaml.safe_load((sample_dir / "params.yml").read_text()))
result_files = get_result_files_new(requested_artifacts=params.requested_artifacts, sample_dir=sample_dir)

# invoke snakemake
SnakemakeInvoke().invoke(work_dir=sample_dir.parent, result_files=result_files)

# zip the results
sample_name = sample_dir.name
output_dir = sample_dir.parent / "output"
output_dir.mkdir(exist_ok=True)
zip_file_path = output_dir / f"{sample_name}.zip"
with zipfile.ZipFile(zip_file_path, "w") as zip_file:
for result_file in result_files:
zip_entry_path = result_file.relative_to(sample_dir.parent)
zip_file.write(result_file, arcname=zip_entry_path)
return zip_file_path


@app.default()
def run(workunit_id: int, sample_dir: Path, ssh_user: str | None = None) -> None:
def run(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None:
client = Bfabric.from_config()
dataset_id, imzml_resource_id, sample_name = _get_resource_flow_ids(client=client, workunit_id=workunit_id)
sample_dir = work_dir / sample_name

prepare_params(client=client, sample_dir=sample_dir, workunit_id=workunit_id)
prepare_inputs(client=client, sample_dir=sample_dir, workunit_id=workunit_id, ssh_user=ssh_user)
prepare_inputs(
client=client,
sample_dir=sample_dir,
dataset_id=dataset_id,
imzml_resource_id=imzml_resource_id,
ssh_user=ssh_user,
)
zip_file_path = run_workflow(sample_dir=sample_dir)
store_outputs(client=client, zip_file_path=zip_file_path, workunit_id=workunit_id)


if __name__ == "__main__":
Expand Down

0 comments on commit 8d07747

Please sign in to comment.