Skip to content

Commit

Permalink
replace the code with new run_batch.py
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Sep 4, 2024
1 parent d2d02d6 commit a5988b0
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 404 deletions.
13 changes: 4 additions & 9 deletions src/depiction_targeted_preproc/pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def run_one_job(
@app.default()
def run_resource_flow(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None:
client = Bfabric.from_config()
_set_workunit_processing(client=client, workunit_id=workunit_id)
set_workunit_processing(client=client, workunit_id=workunit_id)
dataset_id, imzml_resource_id, sample_name = _get_resource_flow_ids(client=client, workunit_id=workunit_id)
run_one_job(
client=client,
Expand All @@ -82,15 +82,10 @@ def run_resource_flow(workunit_id: int, work_dir: Path, ssh_user: str | None = N
imzml_resource_id=imzml_resource_id,
ssh_user=ssh_user,
)
_set_workunit_available(client=client, workunit_id=workunit_id)
set_workunit_available(client=client, workunit_id=workunit_id)


def run_batch(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None:
# TODO maybe it will make sense to still implement this is in a dedicated file
pass


def _set_workunit_processing(client: Bfabric, workunit_id: int) -> None:
def set_workunit_processing(client: Bfabric, workunit_id: int) -> None:
"""Sets the workunit to processing and deletes the default resource if it is available."""
client.save("workunit", {"id": workunit_id, "status": "processing"})
# TODO the default resource should be deleted in the future, but right now we simply set it to 0 and available
Expand All @@ -100,7 +95,7 @@ def _set_workunit_processing(client: Bfabric, workunit_id: int) -> None:
client.save("resource", {"id": list(resources.values())[0].id, "status": "available"})


def _set_workunit_available(client: Bfabric, workunit_id: int) -> None:
def set_workunit_available(client: Bfabric, workunit_id: int) -> None:
client.save("workunit", {"id": workunit_id, "status": "available"})


Expand Down
35 changes: 35 additions & 0 deletions src/depiction_targeted_preproc/pipeline/run_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pathlib import Path

import cyclopts
from bfabric import Bfabric
from bfabric.entities import Workunit
from depiction_targeted_preproc.pipeline.run import set_workunit_processing, set_workunit_available, run_one_job
from depiction_targeted_preprocbatch.batch_dataset import BatchDataset

app = cyclopts.App()


@app.default()
def run_batch(workunit_id: int, work_dir: Path, ssh_user: str | None = None) -> None:
client = Bfabric.from_config()
set_workunit_processing(client=client, workunit_id=workunit_id)

workunit = Workunit.find(id=workunit_id, client=client)
batch_dataset = BatchDataset(dataset_id=workunit.input_dataset.id, client=client)

for job in batch_dataset.jobs:
run_one_job(
client=client,
work_dir=work_dir,
sample_name=str(Path(job.imzml["name"]).stem),
dataset_id=job.panel.id,
workunit_id=workunit_id,
imzml_resource_id=job.imzml.id,
ssh_user=ssh_user,
)

set_workunit_available(client=client, workunit_id=workunit_id)


if __name__ == "__main__":
app()
136 changes: 1 addition & 135 deletions src/depiction_targeted_preprocbatch/executor.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
from __future__ import annotations

import multiprocessing
from dataclasses import dataclass
from pathlib import Path

import cyclopts
from bfabric import Bfabric
from bfabric.entities import Storage, Resource
from bfabric.entities import Resource
from depiction_targeted_preproc.app.workunit_config import WorkunitConfig
from depiction_targeted_preproc.pipeline_config.artifacts_mapping import (
get_result_files,
)
from depiction_targeted_preproc.pipeline_config.model import PipelineParameters
from depiction_targeted_preproc.workflow.snakemake_invoke import SnakemakeInvoke
from depiction_targeted_preprocbatch.batch_dataset import BatchDataset
from depiction_targeted_preprocbatch.job_export_results import JobExportResults
from depiction_targeted_preprocbatch.job_prepare_inputs import JobPrepareInputs
from loguru import logger


@dataclass
Expand All @@ -38,126 +27,3 @@ def from_bfabric(cls, imzml_resource: Resource, workunit_config: WorkunitConfig,
sample_name=Path(imzml_resource["name"]).stem,
ssh_user=ssh_user,
)


class Executor:
"""Executes the pipeline for multiple files, supporting parallel, but isolated, execution of multiple jobs.
Results will be gradually published once they become available.
"""

def __init__(
self,
proc_dir: Path,
output_dir: Path,
workunit_config: WorkunitConfig,
client: Bfabric,
force_ssh_user: str | None = None,
) -> None:
self._client = client
self._workunit_config = workunit_config
self._force_ssh_user = force_ssh_user
self.proc_dir = proc_dir
self.output_dir = output_dir

def run(self, n_jobs: int) -> None:
"""Runs all jobs, executing up to `n_jobs` in parallel."""
self._set_workunit_processing()
batch_dataset = BatchDataset(dataset_id=self._workunit_config.input_dataset_id, client=self._client)

jobs = [
BatchJob.from_bfabric(
imzml_resource=job.imzml, workunit_config=self._workunit_config, ssh_user=self._force_ssh_user
)
for job in batch_dataset.jobs
]
# TODO parallelization is currently broken
if n_jobs != 1:
logger.error("Parallelization is currently broken and will be disabled.")
for job in jobs:
self.run_job(job)
# parallel = joblib.Parallel(n_jobs=n_jobs, verbose=10)
# parallel(joblib.delayed(self.run_job)(job) for job in jobs)
self._set_workunit_available()

def run_job(self, job: BatchJob) -> None:
"""Runs a single job."""
workflow_dir = self.proc_dir / job.sample_name
sample_dir = workflow_dir / job.sample_name
sample_dir.mkdir(parents=True, exist_ok=True)

# stage input
logger.debug(f"Preparing inputs for {job}")
JobPrepareInputs.prepare(job=job, sample_dir=sample_dir, client=self._client)

# invoke the pipeline
logger.debug(f"Running pipeline for {job}")
result_files = self._determine_result_files(job=job, workflow_dir=workflow_dir)
SnakemakeInvoke().invoke(work_dir=workflow_dir, result_files=result_files)

# export the results
logger.debug(f"Exporting results for {job}")
# TODO do not hardcode id
output_storage = Storage.find(id=2, client=self._client)
JobExportResults.export(
client=self._client,
work_dir=workflow_dir,
workunit_config=self._workunit_config,
sample_name=sample_dir.name,
result_files=result_files,
output_storage=output_storage,
force_ssh_user=self._force_ssh_user,
)

def _determine_result_files(self, job: BatchJob, workflow_dir: Path) -> list[Path]:
"""Returns the requested result files based on the pipeline parameters for a particular job."""
return get_result_files(params=job.pipeline_parameters, work_dir=workflow_dir, sample_name=job.sample_name)

def _set_workunit_processing(self) -> None:
"""Sets the workunit to processing and deletes the default resource if it is available."""
self._client.save(
"workunit",
{
"id": self._workunit_config.workunit_id,
"status": "processing",
},
)
JobExportResults.delete_default_resource(workunit_id=self._workunit_config.workunit_id, client=self._client)

def _set_workunit_available(self) -> None:
# TODO not clear if it needs to be addressed here or in the shell script
self._client.save(
"workunit",
{
"id": self._workunit_config.workunit_id,
"status": "available",
},
)


app = cyclopts.App()


@app.default
def process_app(
proc_dir: Path,
output_dir: Path,
workunit_yaml: Path,
n_jobs: int = 32,
force_ssh_user: str | None = None,
) -> None:
"""Runs the executor."""
workunit_config = WorkunitConfig.from_yaml(workunit_yaml)
client = Bfabric.from_config()
executor = Executor(
proc_dir=proc_dir,
output_dir=output_dir,
workunit_config=workunit_config,
client=client,
force_ssh_user=force_ssh_user,
)
executor.run(n_jobs=n_jobs)


if __name__ == "__main__":
multiprocessing.freeze_support()
app()
125 changes: 0 additions & 125 deletions src/depiction_targeted_preprocbatch/job_export_results.py

This file was deleted.

Loading

0 comments on commit a5988b0

Please sign in to comment.