diff --git a/darwin/future/core/items/__init__.py b/darwin/future/core/items/__init__.py index a56cb784d..8d0bf339d 100644 --- a/darwin/future/core/items/__init__.py +++ b/darwin/future/core/items/__init__.py @@ -1,2 +1,2 @@ -from darwin.future.core.items.get import get_item_ids, get_item_ids_stage +from darwin.future.core.items.get import get_item, get_item_ids, get_item_ids_stage from darwin.future.core.items.move_items import move_items_to_stage diff --git a/darwin/future/meta/objects/stage.py b/darwin/future/meta/objects/stage.py index d0aa53219..a72e2a5cb 100644 --- a/darwin/future/meta/objects/stage.py +++ b/darwin/future/meta/objects/stage.py @@ -1,9 +1,10 @@ from __future__ import annotations +import time from typing import List from uuid import UUID -from darwin.future.core.items import move_items_to_stage +from darwin.future.core.items import get_item, move_items_to_stage from darwin.future.core.types.query import QueryFilter from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore from darwin.future.meta.objects.base import MetaBase @@ -59,7 +60,11 @@ def item_ids(self) -> ItemIDQuery: ], ) - def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage: + def move_attached_files_to_stage(self, new_stage_id: UUID, wait: bool = True) -> Stage: + """ + Args: + wait (bool, optional): Waits for Item 'processing_status' to complete. Defaults to True. + """ assert self.meta_params["team_slug"] is not None and isinstance( self.meta_params["team_slug"], str ) @@ -75,6 +80,18 @@ def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage: self.meta_params["dataset_id"], ) ids = [x.id for x in self.item_ids.collect_all()] + + if wait: + while True: + for _id in ids: + if get_item(self.client, slug, _id).processing_status != "complete": + # wait for 0.5 seconds before checking again + time.sleep(0.5) + break + else: + # All items are complete, break the while loop + break + move_items_to_stage(self.client, slug, w_id, d_id, new_stage_id, ids) return self diff --git a/darwin/future/meta/objects/workflow.py b/darwin/future/meta/objects/workflow.py index 7e931e1da..6966e6509 100644 --- a/darwin/future/meta/objects/workflow.py +++ b/darwin/future/meta/objects/workflow.py @@ -8,7 +8,7 @@ from darwin.datatypes import PathLike from darwin.future.data_objects.workflow import WFDatasetCore, WFTypeCore, WorkflowCore from darwin.future.meta.objects.base import MetaBase -from darwin.future.meta.queries.stage import StageQuery +from darwin.future.meta.queries.stage import Stage, StageQuery class Workflow(MetaBase[WorkflowCore]): @@ -55,6 +55,12 @@ def stages(self) -> StageQuery: meta_params["dataset_name"] = self.datasets[0].name return StageQuery(self.client, meta_params=meta_params) + def _get_dataset_stage(self) -> Stage | None: + # stages are not in right order - finding the dataset stage + for stage in self.stages: + if stage.type == "dataset": + return stage + @property def datasets(self) -> List[WFDatasetCore]: if self._element.dataset is None: @@ -69,15 +75,19 @@ def id(self) -> UUID: def name(self) -> str: return self._element.name - def push_from_dataset_stage(self) -> Workflow: + def push_from_dataset_stage(self, wait: bool = True) -> Workflow: assert self._element.dataset is not None stages = self.stages - ds_stage = stages[0] assert len(stages) > 1 + + ds_stage = self._get_dataset_stage() + assert ds_stage + assert ds_stage._element.type == WFTypeCore.DATASET next_stage = ds_stage._element.edges[0].target_stage_id assert next_stage is not None - ds_stage.move_attached_files_to_stage(next_stage) + + ds_stage.move_attached_files_to_stage(next_stage, wait) return self def upload_files( @@ -91,6 +101,7 @@ def upload_files( preserve_folders: bool = False, verbose: bool = False, auto_push: bool = True, + wait: bool = True, ) -> Workflow: assert self._element.dataset is not None upload_data( @@ -105,7 +116,7 @@ def upload_files( verbose, ) if auto_push: - self.push_from_dataset_stage() + self.push_from_dataset_stage(wait=wait) return self def __str__(self) -> str: