Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PY-398] Polling on Item Push #708

Merged
merged 10 commits into from
Nov 20, 2023
2 changes: 1 addition & 1 deletion darwin/future/core/items/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from darwin.future.core.items.get import get_item_ids, get_item_ids_stage
from darwin.future.core.items.get import get_item, get_item_ids, get_item_ids_stage
from darwin.future.core.items.move_items import move_items_to_stage
6 changes: 6 additions & 0 deletions darwin/future/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,9 @@ class DatasetNotFound(DarwinException):
"""Raised when the dataset endpoint returns a malformed response."""

...


class MaxRetriesError(DarwinException):
"""Raised when a certain API call is re-tried for {x} number of times."""

...
58 changes: 56 additions & 2 deletions darwin/future/meta/objects/stage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import time
from typing import List
from uuid import UUID

from darwin.future.core.items import move_items_to_stage
from darwin.future.core.items import get_item, move_items_to_stage
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore
from darwin.future.exceptions import MaxRetriesError
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.item import ItemQuery
from darwin.future.meta.queries.item_id import ItemIDQuery
Expand Down Expand Up @@ -76,7 +78,50 @@ def item_ids(self) -> ItemIDQuery:
],
)

def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
def check_all_items_complete(
self,
slug: str,
item_ids: list[str],
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> bool:
"""
Checks if all items are complete. If not, waits and tries again. Raises error if max attempts reached.

Args:
slug (str): Team slug
item_ids (list[str]): List of item ids
max_attempts (int, optional): Max number of attempts. Defaults to 5.
wait_time (float, optional): Wait time between attempts. Defaults to 0.5.
"""
for attempt in range(1, wait_max_attempts + 1):
# check if all items are complete
for item_id in item_ids[:]:
if get_item(self.client, slug, item_id).processing_status != "complete":
break
item_ids.remove(item_id)
else:
# if all items are complete, return.
return True
# if not complete, wait
time.sleep(wait_time * attempt)
else:
# if max attempts reached, raise error
raise MaxRetriesError(
f"Max attempts reached. {len(item_ids)} items pending completion check."
)

def move_attached_files_to_stage(
self,
new_stage_id: UUID,
wait: bool = True,
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> Stage:
"""
Args:
wait (bool, optional): Waits for Item 'processing_status' to complete. Defaults to True.
"""
assert self.meta_params["team_slug"] is not None and isinstance(
self.meta_params["team_slug"], str
)
Expand All @@ -92,6 +137,15 @@ def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
self.meta_params["dataset_id"],
)
ids = [str(x.id) for x in self.item_ids.collect_all()]

if wait:
self.check_all_items_complete(
slug=team_slug,
item_ids=ids,
wait_max_attempts=wait_max_attempts,
wait_time=wait_time,
)

move_items_to_stage(
self.client,
team_slug,
Expand Down
30 changes: 25 additions & 5 deletions darwin/future/meta/objects/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from darwin.datatypes import PathLike
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFDatasetCore, WFTypeCore, WorkflowCore
from darwin.future.exceptions import MissingDataset
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.item import ItemQuery
from darwin.future.meta.queries.stage import StageQuery
from darwin.future.meta.queries.stage import Stage, StageQuery


class Workflow(MetaBase[WorkflowCore]):
Expand Down Expand Up @@ -65,6 +66,14 @@ def stages(self) -> StageQuery:
meta_params["dataset_name"] = self.datasets[0].name
return StageQuery(self.client, meta_params=meta_params)

def _get_dataset_stage(self) -> Stage:
# stages are not in right order - finding the dataset stage
for stage in self.stages:
if stage.type == "dataset":
return stage

raise MissingDataset("Workflow has no dataset stage")

@property
def datasets(self) -> List[WFDatasetCore]:
if self._element.dataset is None:
Expand All @@ -79,15 +88,21 @@ def id(self) -> UUID:
def name(self) -> str:
return self._element.name

def push_from_dataset_stage(self) -> Workflow:
def push_from_dataset_stage(
self, wait: bool = True, wait_max_attempts: int = 5, wait_time: float = 0.5
) -> Workflow:
assert self._element.dataset is not None
stages = self.stages
ds_stage = stages[0]
assert len(stages) > 1

ds_stage = self._get_dataset_stage()
assert ds_stage._element.type == WFTypeCore.DATASET
next_stage = ds_stage._element.edges[0].target_stage_id
assert next_stage is not None
ds_stage.move_attached_files_to_stage(next_stage)
ds_stage.move_attached_files_to_stage(
next_stage, wait, wait_max_attempts, wait_time
)

return self

def upload_files(
Expand All @@ -101,6 +116,9 @@ def upload_files(
preserve_folders: bool = False,
verbose: bool = False,
auto_push: bool = True,
wait: bool = True,
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> Workflow:
assert self._element.dataset is not None
upload_data(
Expand All @@ -115,7 +133,9 @@ def upload_files(
verbose,
)
if auto_push:
self.push_from_dataset_stage()
self.push_from_dataset_stage(
wait=wait, wait_max_attempts=wait_max_attempts, wait_time=wait_time
)
return self

def __str__(self) -> str:
Expand Down
53 changes: 52 additions & 1 deletion darwin/future/tests/meta/objects/test_stagemeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,58 @@ def test_move_attached_files_to_stage(
json={"success": UUIDs_str},
status=200,
)
stage_meta.move_attached_files_to_stage(stage_meta.id)
stage_meta.move_attached_files_to_stage(stage_meta.id, wait=False)


def test_move_attached_files_to_stage_wait(
base_meta_client: Client, stage_meta: Stage, UUIDs_str: List[str], UUIDs: List[UUID]
) -> None:
with responses.RequestsMock() as rsps:
rsps.add(
rsps.GET,
base_meta_client.config.api_endpoint
+ "v2/teams/default-team/items/list_ids",
json={"item_ids": UUIDs_str},
match=[
query_param_matcher(
{
"page[offset]": "0",
"page[size]": "500",
"workflow_stage_ids": str(stage_meta.id),
"dataset_ids": "1337",
}
)
],
status=200,
)
rsps.add(
rsps.POST,
base_meta_client.config.api_endpoint + "v2/teams/default-team/items/stage",
json={"success": UUIDs_str},
status=200,
)
for uuid in stage_meta.item_ids.collect_all():
rsps.add(
rsps.GET,
base_meta_client.config.api_endpoint
+ f"v2/teams/default-team/items/{uuid}",
json={
"archived": False,
"dataset_id": 1337,
"id": "00000000-0000-0000-0000-000000000000",
"layout": None,
"name": "test_0",
"path": "test_path",
"priority": 0,
"processing_status": "complete",
"slots": [],
"tags": [],
},
status=200,
)
stage_meta.move_attached_files_to_stage(
stage_meta.id, wait=True, wait_max_attempts=5, wait_time=0.5
)


def test_get_stage_id(stage_meta: Stage) -> None:
Expand Down
Loading