Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Aug 12, 2024
1 parent 0436813 commit 1b3ab04
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
3 changes: 1 addition & 2 deletions src/depiction_targeted_preprocbatch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def run_job(self, job: BatchJob) -> None:
sample_dir.mkdir(parents=True, exist_ok=True)

# stage input
prepare_inputs = JobPrepareInputs(job=job, sample_dir=sample_dir)
prepare_inputs.prepare()
JobPrepareInputs.prepare(job=job, sample_dir=sample_dir)

# invoke the pipeline
result_files = self._determine_result_files(job_dir=job_dir)
Expand Down
2 changes: 2 additions & 0 deletions src/depiction_targeted_preprocbatch/job_export_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@


class JobExportResults:
"""Exports the results of the job to the output storage and registers it in B-Fabric."""

def __init__(self, client: Bfabric, work_dir: Path, workunit_config: WorkunitConfig) -> None:
self._client = client
self._workunit_config = workunit_config
Expand Down
62 changes: 30 additions & 32 deletions src/depiction_targeted_preprocbatch/job_prepare_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from pathlib import Path
from typing import TYPE_CHECKING

import polars as pl
from bfabric.entities import Storage
from loguru import logger

from depiction.persistence.file_checksums import FileChecksums
Expand All @@ -17,63 +15,63 @@


class JobPrepareInputs:
"""Prepares the inputs for a particular job."""
"""Prepares the inputs for a particular job.
:param job: The job to prepare the inputs for.
:param sample_dir: The directory where the inputs should be staged.
"""

def __init__(self, job: BatchJob, sample_dir: Path) -> None:
self._job = job
self._sample_dir = sample_dir

def prepare(self) -> None:
@classmethod
def prepare(cls, job: BatchJob, sample_dir: Path) -> None:
"""Prepares the inputs for a particular job.
:param job: The job to prepare the inputs for.
:param sample_dir: The directory where the inputs should be staged.
"""
instance = cls(job=job, sample_dir=sample_dir)
instance.stage_all()

def stage_all(self) -> None:
"""Stages all required input files for a particular job."""
self._stage_imzml(
relative_path=self._job.imzml_relative_path,
input_storage=self._job.imzml_storage,
sample_dir=self._sample_dir,
checksum=self._job.imzml_checksum,
)
self._stage_panel(
sample_dir=self._sample_dir,
panel_df=self._job.panel_df,
)
self._stage_pipeline_parameters(sample_dir=self._sample_dir)
self.stage_imzml()
self.stage_panel()
self.stage_pipeline_parameters()

def _stage_imzml(self, relative_path: Path, input_storage: Storage, sample_dir: Path, checksum: str) -> None:
def stage_imzml(self) -> None:
"""Copies the `raw.imzML` and `raw.ibd` files to the sample directory.
This method assumes the position will be on a remote server, and first needs to be copied with scp.
:param relative_path: Relative path of the imzML file (relative to storage roo-).
:param input_storage: Storage of the imzML file.
:param sample_dir: Directory to copy the files to.
:param checksum: Expected checksum of the imzML file.
"""
# Check for some not-yet supported functionality (TODO)
if relative_path.suffix != ".imzML":
if self._job.imzml_relative_path.suffix != ".imzML":
# TODO implement this later
raise NotImplementedError(
"Currently only .imzML files are supported, .imzML.zip will be supported in the future"
)

# determine the paths to copy from
input_paths = [relative_path, relative_path.with_suffix(".ibd")]
scp_uris = [f"{input_storage.scp_prefix}{path}" for path in input_paths]
input_paths = [self._job.imzml_relative_path, self._job.imzml_relative_path.with_suffix(".ibd")]
scp_uris = [f"{self._job.imzml_storage.scp_prefix}{path}" for path in input_paths]

# perform the copies
for scp_uri, result_name in zip(scp_uris, ["raw.imzML", "raw.ibd"]):
self._scp(scp_uri, str(sample_dir / result_name))
self._scp(scp_uri, str(self._sample_dir / result_name))

# check the checksum
actual_checksum = FileChecksums(file_path=sample_dir / "raw.imzML").checksum_md5
if actual_checksum != checksum:
raise ValueError(f"Checksum mismatch: expected {checksum}, got {actual_checksum}")
actual_checksum = FileChecksums(file_path=self._sample_dir / "raw.imzML").checksum_md5
if actual_checksum != self._job.imzml_checksum:
raise ValueError(f"Checksum mismatch: expected {self._job.imzml_checksum}, got {actual_checksum}")

def _stage_panel(self, sample_dir: Path, panel_df: pl.DataFrame) -> None:
def stage_panel(self) -> None:
"""Writes the marker panel to the sample directory."""
write_standardized_table(input_df=panel_df, output_csv=sample_dir / "mass_list.raw.csv")
write_standardized_table(input_df=self._job.panel_df, output_csv=self._sample_dir / "mass_list.raw.csv")

def _stage_pipeline_parameters(self, sample_dir: Path) -> None:
def stage_pipeline_parameters(self) -> None:
"""Copies the `pipeline_params.yml` file to the particular sample's directory."""
shutil.copyfile(
sample_dir.parents[1] / "pipeline_params.yml",
sample_dir / "pipeline_params.yml",
self._sample_dir.parents[1] / "pipeline_params.yml",
self._sample_dir / "pipeline_params.yml",
)

def _scp(self, source: str | Path, target: str | Path) -> None:
Expand Down

0 comments on commit 1b3ab04

Please sign in to comment.