Skip to content

Commit

Permalink
update Stage meta object - implement retries when checking all-items-…
Browse files Browse the repository at this point in the history
…complete
  • Loading branch information
saurbhc committed Nov 9, 2023
1 parent 0434c58 commit 0cf0e3c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
6 changes: 6 additions & 0 deletions darwin/future/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,9 @@ class DatasetNotFound(DarwinException):
"""Raised when the dataset endpoint returns a malformed response."""

...


class MaxRetriesError(DarwinException):
"""Raised when a certain API call is re-tried for {x} number of times."""

...
29 changes: 21 additions & 8 deletions darwin/future/meta/objects/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from darwin.future.core.items import get_item, move_items_to_stage
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore
from darwin.future.exceptions import MaxRetriesError
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.item_id import ItemIDQuery

Expand Down Expand Up @@ -83,16 +84,28 @@ def move_attached_files_to_stage(
)
ids = [x.id for x in self.item_ids.collect_all()]

def all_items_complete(item_ids: list[UUID]) -> bool:
for item_id in item_ids:
if get_item(self.client, slug, item_id).processing_status != "complete":
return False
return True

if wait:
while True:
for _id in ids:
if get_item(self.client, slug, _id).processing_status != "complete":
# wait for 0.5 seconds before checking again
time.sleep(0.5)
break
else:
# All items are complete, break the while loop
max_attempts = 5
wait_time = 0.5 # seconds

# Try to check if all items are complete for a maximum number of attempts
for attempt in range(1, max_attempts + 1):
complete = all_items_complete(item_ids=ids)
if complete:
break
else:
# The wait time increases with each attempt
time.sleep(wait_time * attempt)
else:
raise MaxRetriesError(
"Max retry attempts to check for Item(s) to complete processing"
)

move_items_to_stage(self.client, slug, w_id, d_id, new_stage_id, ids)
return self
Expand Down

0 comments on commit 0cf0e3c

Please sign in to comment.