diff --git a/src/depiction_targeted_preprocbatch/executor.py b/src/depiction_targeted_preprocbatch/executor.py index 5b0c42e..14e998b 100644 --- a/src/depiction_targeted_preprocbatch/executor.py +++ b/src/depiction_targeted_preprocbatch/executor.py @@ -54,6 +54,7 @@ def __init__( def run(self, n_jobs: int) -> None: """Runs all jobs, executing up to `n_jobs` in parallel.""" + self._set_workunit_processing() self._work_dir.mkdir(exist_ok=True, parents=True) batch_dataset = BatchDataset(dataset_id=self._workunit_config.input_dataset_id, client=self._client) pipeline_parameters = self._prepare_pipeline_parameters() @@ -114,6 +115,17 @@ def _prepare_pipeline_parameters(self) -> Path: yaml.dump(self._workunit_config.pipeline_parameters.model_dump(mode="json"), file) return result_file + def _set_workunit_processing(self) -> None: + """Sets the workunit to processing and deletes the default resource if it is available.""" + self._client.save( + "workunit", + { + "id": self._workunit_config.workunit_id, + "status": "processing", + }, + ) + JobExportResults.delete_default_resource(workunit_id=self._workunit_config.workunit_id, client=self._client) + app = cyclopts.App() diff --git a/src/depiction_targeted_preprocbatch/job_export_results.py b/src/depiction_targeted_preprocbatch/job_export_results.py index 16d85be..555bce8 100644 --- a/src/depiction_targeted_preprocbatch/job_export_results.py +++ b/src/depiction_targeted_preprocbatch/job_export_results.py @@ -4,7 +4,8 @@ from pathlib import Path from bfabric import Bfabric -from bfabric.entities import Storage +from bfabric.entities import Storage, Resource +from loguru import logger from depiction.persistence.file_checksums import FileChecksums from depiction_targeted_preproc.app.workunit_config import WorkunitConfig @@ -28,6 +29,10 @@ def __init__( self._output_storage = output_storage self._force_ssh_user = force_ssh_user + @property + def _workunit_id(self) -> int: + return self._workunit_config.workunit_id + @classmethod def export( cls, @@ -55,6 +60,10 @@ def export_results(self, sample_name: str, result_files: list[Path]) -> None: output_path_relative = self._copy_zip_to_storage(zip_file_path) self._register_zip_in_workunit(output_path_relative, zip_file_path) + def delete_local(self): + # TODO this functionality will be needed when processing large jobs + raise NotImplementedError + def _create_zip_file(self, result_files: list[Path], sample_name: str) -> Path: """Creates a ZIP file containing the results for one sample, and returns the zip file's path.""" self.output_dir.mkdir(exist_ok=True, parents=True) @@ -70,7 +79,7 @@ def _register_zip_in_workunit(self, output_path_relative: Path, zip_file_path: P "resource", { "name": zip_file_path.name, - "workunitid": self._workunit_config.workunit_id, + "workunitid": self._workunit_id, "storageid": self._output_storage.id, "relativepath": output_path_relative, "filechecksum": checksum, @@ -85,3 +94,21 @@ def _copy_zip_to_storage(self, zip_file_path: Path) -> Path: output_uri = f"{self._output_storage.scp_prefix}{output_path_relative}" scp(zip_file_path, output_uri, username=self._force_ssh_user) return output_path_relative + + @staticmethod + def delete_default_resource(workunit_id: int, client: Bfabric) -> bool: + """Deletes the default resource created by the wrapper creator if it exists. Returns true if the resource was + successfully deleted. + """ + resources = Resource.find_by( + {"name": "MSI_Targeted_PreprocBatch 0 - resource", "workunitid": workunit_id}, client=client + ) + if len(resources) == 1: + resource_id = list(resources.values())[0].id + logger.info(f"Deleting default resource with ID {resource_id}") + result = client.delete("resource", resource_id, check=False) + return result.is_success + elif len(resources) > 1: + raise ValueError("There should never be more than one default resource.") + else: + return False