Skip to content

Commit

Permalink
delete the default resource
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Aug 12, 2024
1 parent e0fb4b1 commit 1fe1859
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/depiction_targeted_preprocbatch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(

def run(self, n_jobs: int) -> None:
"""Runs all jobs, executing up to `n_jobs` in parallel."""
self._set_workunit_processing()
self._work_dir.mkdir(exist_ok=True, parents=True)
batch_dataset = BatchDataset(dataset_id=self._workunit_config.input_dataset_id, client=self._client)
pipeline_parameters = self._prepare_pipeline_parameters()
Expand Down Expand Up @@ -114,6 +115,17 @@ def _prepare_pipeline_parameters(self) -> Path:
yaml.dump(self._workunit_config.pipeline_parameters.model_dump(mode="json"), file)
return result_file

def _set_workunit_processing(self) -> None:
"""Sets the workunit to processing and deletes the default resource if it is available."""
self._client.save(
"workunit",
{
"id": self._workunit_config.workunit_id,
"status": "processing",
},
)
JobExportResults.delete_default_resource(workunit_id=self._workunit_config.workunit_id, client=self._client)


app = cyclopts.App()

Expand Down
31 changes: 29 additions & 2 deletions src/depiction_targeted_preprocbatch/job_export_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from pathlib import Path

from bfabric import Bfabric
from bfabric.entities import Storage
from bfabric.entities import Storage, Resource
from loguru import logger

from depiction.persistence.file_checksums import FileChecksums
from depiction_targeted_preproc.app.workunit_config import WorkunitConfig
Expand All @@ -28,6 +29,10 @@ def __init__(
self._output_storage = output_storage
self._force_ssh_user = force_ssh_user

@property
def _workunit_id(self) -> int:
return self._workunit_config.workunit_id

@classmethod
def export(
cls,
Expand Down Expand Up @@ -55,6 +60,10 @@ def export_results(self, sample_name: str, result_files: list[Path]) -> None:
output_path_relative = self._copy_zip_to_storage(zip_file_path)
self._register_zip_in_workunit(output_path_relative, zip_file_path)

def delete_local(self):
# TODO this functionality will be needed when processing large jobs
raise NotImplementedError

def _create_zip_file(self, result_files: list[Path], sample_name: str) -> Path:
"""Creates a ZIP file containing the results for one sample, and returns the zip file's path."""
self.output_dir.mkdir(exist_ok=True, parents=True)
Expand All @@ -70,7 +79,7 @@ def _register_zip_in_workunit(self, output_path_relative: Path, zip_file_path: P
"resource",
{
"name": zip_file_path.name,
"workunitid": self._workunit_config.workunit_id,
"workunitid": self._workunit_id,
"storageid": self._output_storage.id,
"relativepath": output_path_relative,
"filechecksum": checksum,
Expand All @@ -85,3 +94,21 @@ def _copy_zip_to_storage(self, zip_file_path: Path) -> Path:
output_uri = f"{self._output_storage.scp_prefix}{output_path_relative}"
scp(zip_file_path, output_uri, username=self._force_ssh_user)
return output_path_relative

@staticmethod
def delete_default_resource(workunit_id: int, client: Bfabric) -> bool:
"""Deletes the default resource created by the wrapper creator if it exists. Returns true if the resource was
successfully deleted.
"""
resources = Resource.find_by(
{"name": "MSI_Targeted_PreprocBatch 0 - resource", "workunitid": workunit_id}, client=client
)
if len(resources) == 1:
resource_id = list(resources.values())[0].id
logger.info(f"Deleting default resource with ID {resource_id}")
result = client.delete("resource", resource_id, check=False)
return result.is_success
elif len(resources) > 1:
raise ValueError("There should never be more than one default resource.")
else:
return False

0 comments on commit 1fe1859

Please sign in to comment.