Skip to content

Commit

Permalink
update Meta Stage obj - add an optional wait arg to wait for item pro…
Browse files Browse the repository at this point in the history
…cessing_status, update Meta Workflow obj - fix dataset stage
  • Loading branch information
saurbhc committed Nov 7, 2023
1 parent e9b26b1 commit 9ad023e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
2 changes: 1 addition & 1 deletion darwin/future/core/items/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from darwin.future.core.items.get import get_item_ids, get_item_ids_stage
from darwin.future.core.items.get import get_item, get_item_ids, get_item_ids_stage
from darwin.future.core.items.move_items import move_items_to_stage
21 changes: 19 additions & 2 deletions darwin/future/meta/objects/stage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import time
from typing import List
from uuid import UUID

from darwin.future.core.items import move_items_to_stage
from darwin.future.core.items import get_item, move_items_to_stage
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore
from darwin.future.meta.objects.base import MetaBase
Expand Down Expand Up @@ -59,7 +60,11 @@ def item_ids(self) -> ItemIDQuery:
],
)

def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
def move_attached_files_to_stage(self, new_stage_id: UUID, wait: bool = True) -> Stage:
"""
Args:
wait (bool, optional): Waits for Item 'processing_status' to complete. Defaults to True.
"""
assert self.meta_params["team_slug"] is not None and isinstance(
self.meta_params["team_slug"], str
)
Expand All @@ -75,6 +80,18 @@ def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
self.meta_params["dataset_id"],
)
ids = [x.id for x in self.item_ids.collect_all()]

if wait:
while True:
for _id in ids:
if get_item(self.client, slug, _id).processing_status != "complete":
# wait for 0.5 seconds before checking again
time.sleep(0.5)
break
else:
# All items are complete, break the while loop
break

move_items_to_stage(self.client, slug, w_id, d_id, new_stage_id, ids)
return self

Expand Down
21 changes: 16 additions & 5 deletions darwin/future/meta/objects/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from darwin.datatypes import PathLike
from darwin.future.data_objects.workflow import WFDatasetCore, WFTypeCore, WorkflowCore
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.stage import StageQuery
from darwin.future.meta.queries.stage import Stage, StageQuery


class Workflow(MetaBase[WorkflowCore]):
Expand Down Expand Up @@ -55,6 +55,12 @@ def stages(self) -> StageQuery:
meta_params["dataset_name"] = self.datasets[0].name
return StageQuery(self.client, meta_params=meta_params)

def _get_dataset_stage(self) -> Stage | None:
# stages are not in right order - finding the dataset stage
for stage in self.stages:
if stage.type == "dataset":
return stage

@property
def datasets(self) -> List[WFDatasetCore]:
if self._element.dataset is None:
Expand All @@ -69,15 +75,19 @@ def id(self) -> UUID:
def name(self) -> str:
return self._element.name

def push_from_dataset_stage(self) -> Workflow:
def push_from_dataset_stage(self, wait: bool = True) -> Workflow:
assert self._element.dataset is not None
stages = self.stages
ds_stage = stages[0]
assert len(stages) > 1

ds_stage = self._get_dataset_stage()
assert ds_stage

assert ds_stage._element.type == WFTypeCore.DATASET
next_stage = ds_stage._element.edges[0].target_stage_id
assert next_stage is not None
ds_stage.move_attached_files_to_stage(next_stage)

ds_stage.move_attached_files_to_stage(next_stage, wait)
return self

def upload_files(
Expand All @@ -91,6 +101,7 @@ def upload_files(
preserve_folders: bool = False,
verbose: bool = False,
auto_push: bool = True,
wait: bool = True,
) -> Workflow:
assert self._element.dataset is not None
upload_data(
Expand All @@ -105,7 +116,7 @@ def upload_files(
verbose,
)
if auto_push:
self.push_from_dataset_stage()
self.push_from_dataset_stage(wait=wait)
return self

def __str__(self) -> str:
Expand Down

0 comments on commit 9ad023e

Please sign in to comment.