Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PY-398] Polling on Item Push #708

Merged
merged 10 commits into from
Nov 20, 2023
2 changes: 1 addition & 1 deletion darwin/future/core/items/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from darwin.future.core.items.get import get_item_ids, get_item_ids_stage
from darwin.future.core.items.get import get_item, get_item_ids, get_item_ids_stage
from darwin.future.core.items.move_items import move_items_to_stage
23 changes: 21 additions & 2 deletions darwin/future/meta/objects/stage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import time
from typing import List
from uuid import UUID

from darwin.future.core.items import move_items_to_stage
from darwin.future.core.items import get_item, move_items_to_stage
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore
from darwin.future.meta.objects.base import MetaBase
Expand Down Expand Up @@ -59,7 +60,13 @@ def item_ids(self) -> ItemIDQuery:
],
)

def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
def move_attached_files_to_stage(
self, new_stage_id: UUID, wait: bool = True
) -> Stage:
"""
Args:
wait (bool, optional): Waits for Item 'processing_status' to complete. Defaults to True.
"""
assert self.meta_params["team_slug"] is not None and isinstance(
self.meta_params["team_slug"], str
)
Expand All @@ -75,6 +82,18 @@ def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
self.meta_params["dataset_id"],
)
ids = [x.id for x in self.item_ids.collect_all()]

if wait:
while True:
saurbhc marked this conversation as resolved.
Show resolved Hide resolved
for _id in ids:
if get_item(self.client, slug, _id).processing_status != "complete":
# wait for 0.5 seconds before checking again
time.sleep(0.5)
break
else:
# All items are complete, break the while loop
break

move_items_to_stage(self.client, slug, w_id, d_id, new_stage_id, ids)
return self

Expand Down
21 changes: 16 additions & 5 deletions darwin/future/meta/objects/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from darwin.datatypes import PathLike
from darwin.future.data_objects.workflow import WFDatasetCore, WFTypeCore, WorkflowCore
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.stage import StageQuery
from darwin.future.meta.queries.stage import Stage, StageQuery


class Workflow(MetaBase[WorkflowCore]):
Expand Down Expand Up @@ -55,6 +55,12 @@ def stages(self) -> StageQuery:
meta_params["dataset_name"] = self.datasets[0].name
return StageQuery(self.client, meta_params=meta_params)

def _get_dataset_stage(self) -> Stage | None:
saurbhc marked this conversation as resolved.
Show resolved Hide resolved
# stages are not in right order - finding the dataset stage
for stage in self.stages:
if stage.type == "dataset":
return stage

@property
def datasets(self) -> List[WFDatasetCore]:
if self._element.dataset is None:
Expand All @@ -69,15 +75,19 @@ def id(self) -> UUID:
def name(self) -> str:
return self._element.name

def push_from_dataset_stage(self) -> Workflow:
def push_from_dataset_stage(self, wait: bool = True) -> Workflow:
assert self._element.dataset is not None
stages = self.stages
ds_stage = stages[0]
assert len(stages) > 1

ds_stage = self._get_dataset_stage()
assert ds_stage

assert ds_stage._element.type == WFTypeCore.DATASET
next_stage = ds_stage._element.edges[0].target_stage_id
assert next_stage is not None
ds_stage.move_attached_files_to_stage(next_stage)

ds_stage.move_attached_files_to_stage(next_stage, wait)
return self

def upload_files(
Expand All @@ -91,6 +101,7 @@ def upload_files(
preserve_folders: bool = False,
verbose: bool = False,
auto_push: bool = True,
wait: bool = True,
) -> Workflow:
assert self._element.dataset is not None
upload_data(
Expand All @@ -105,7 +116,7 @@ def upload_files(
verbose,
)
if auto_push:
self.push_from_dataset_stage()
self.push_from_dataset_stage(wait=wait)
return self

def __str__(self) -> str:
Expand Down
51 changes: 50 additions & 1 deletion darwin/future/tests/meta/objects/test_stagemeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,56 @@ def test_move_attached_files_to_stage(
json={"success": UUIDs_str},
status=200,
)
stage_meta.move_attached_files_to_stage(stage_meta.id)
stage_meta.move_attached_files_to_stage(stage_meta.id, wait=False)


def test_move_attached_files_to_stage_wait(
base_meta_client: Client, stage_meta: Stage, UUIDs_str: List[str], UUIDs: List[UUID]
) -> None:
with responses.RequestsMock() as rsps:
rsps.add(
rsps.GET,
base_meta_client.config.api_endpoint
+ "v2/teams/default-team/items/list_ids",
json={"item_ids": UUIDs_str},
match=[
query_param_matcher(
{
"page[offset]": "0",
"page[size]": "500",
"workflow_stage_ids": str(stage_meta.id),
"dataset_ids": "1337",
}
)
],
status=200,
)
rsps.add(
rsps.POST,
base_meta_client.config.api_endpoint + "v2/teams/default-team/items/stage",
json={"success": UUIDs_str},
status=200,
)
for uuid in stage_meta.item_ids.collect_all():
rsps.add(
rsps.GET,
base_meta_client.config.api_endpoint
+ f"v2/teams/default-team/items/{uuid}",
json={
"archived": False,
"dataset_id": 1337,
"id": "00000000-0000-0000-0000-000000000000",
"layout": None,
"name": "test_0",
"path": "test_path",
"priority": 0,
"processing_status": "complete",
"slots": [],
"tags": [],
},
status=200,
)
stage_meta.move_attached_files_to_stage(stage_meta.id, wait=True)


def test_get_stage_id(stage_meta: Stage) -> None:
Expand Down
Loading