Skip to content

Commit

Permalink
rewrite check_all_items_complete logic in an attempt to lower len(i…
Browse files Browse the repository at this point in the history
…tem_ids) APIs calls + minor improvements
  • Loading branch information
saurbhc committed Nov 10, 2023
1 parent 0cf0e3c commit b9ee64a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
68 changes: 46 additions & 22 deletions darwin/future/meta/objects/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,47 @@ def item_ids(self) -> ItemIDQuery:
],
)

def check_all_items_complete(
self,
slug: str,
item_ids: list[UUID],
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> bool:
"""
Checks if all items are complete. If not, waits and tries again. Raises error if max attempts reached.
Args:
slug (str): Team slug
item_ids (list[UUID]): List of item ids
max_attempts (int, optional): Max number of attempts. Defaults to 5.
wait_time (float, optional): Wait time between attempts. Defaults to 0.5.
"""
completed = []
for attempt in range(1, wait_max_attempts + 1):
# check if all items are complete
for item_id in item_ids:
if get_item(self.client, slug, item_id).processing_status != "complete":
# if not complete, wait and try again with the in-complete items
time.sleep(wait_time * attempt)
item_ids = [i for i in item_ids if i not in completed]
break
completed.append(item_id)
else:
# if all items are complete, return.
return True
else:
# if max attempts reached, raise error
raise MaxRetriesError(
f"Max attempts reached. {len(completed)} items completed out of {len(item_ids)} items."
)

def move_attached_files_to_stage(
self, new_stage_id: UUID, wait: bool = True
self,
new_stage_id: UUID,
wait: bool = True,
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> Stage:
"""
Args:
Expand All @@ -84,28 +123,13 @@ def move_attached_files_to_stage(
)
ids = [x.id for x in self.item_ids.collect_all()]

def all_items_complete(item_ids: list[UUID]) -> bool:
for item_id in item_ids:
if get_item(self.client, slug, item_id).processing_status != "complete":
return False
return True

if wait:
max_attempts = 5
wait_time = 0.5 # seconds

# Try to check if all items are complete for a maximum number of attempts
for attempt in range(1, max_attempts + 1):
complete = all_items_complete(item_ids=ids)
if complete:
break
else:
# The wait time increases with each attempt
time.sleep(wait_time * attempt)
else:
raise MaxRetriesError(
"Max retry attempts to check for Item(s) to complete processing"
)
self.check_all_items_complete(
slug=slug,
item_ids=ids,
max_attempts=wait_max_attempts,
wait_time=wait_time,
)

move_items_to_stage(self.client, slug, w_id, d_id, new_stage_id, ids)
return self
Expand Down
14 changes: 11 additions & 3 deletions darwin/future/meta/objects/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def id(self) -> UUID:
def name(self) -> str:
return self._element.name

def push_from_dataset_stage(self, wait: bool = True) -> Workflow:
def push_from_dataset_stage(
self, wait: bool = True, wait_max_attempts: int = 5, wait_time: float = 0.5
) -> Workflow:
assert self._element.dataset is not None
stages = self.stages
assert len(stages) > 1
Expand All @@ -87,7 +89,9 @@ def push_from_dataset_stage(self, wait: bool = True) -> Workflow:
assert ds_stage._element.type == WFTypeCore.DATASET
next_stage = ds_stage._element.edges[0].target_stage_id
assert next_stage is not None
ds_stage.move_attached_files_to_stage(next_stage, wait)
ds_stage.move_attached_files_to_stage(
next_stage, wait, wait_max_attempts, wait_time
)

return self

Expand All @@ -103,6 +107,8 @@ def upload_files(
verbose: bool = False,
auto_push: bool = True,
wait: bool = True,
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> Workflow:
assert self._element.dataset is not None
upload_data(
Expand All @@ -117,7 +123,9 @@ def upload_files(
verbose,
)
if auto_push:
self.push_from_dataset_stage(wait=wait)
self.push_from_dataset_stage(
wait=wait, wait_max_attempts=wait_max_attempts, wait_time=wait_time
)
return self

def __str__(self) -> str:
Expand Down

0 comments on commit b9ee64a

Please sign in to comment.