Skip to content

Commit

Permalink
retry on exception at evaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxiBoether committed Sep 20, 2024
1 parent 2fee48b commit 4e3564d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
50 changes: 40 additions & 10 deletions modyn/supervisor/internal/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,28 +302,46 @@ def wait_for_evaluation_completion(self, evaluation_id: int) -> bool:
self.init_evaluator()
raise e

# NOT within retry block
if not res.valid:
exception_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n"
logger.error(exception_msg)
raise RuntimeError(exception_msg)
# Should only happen when requesting invalid id, hence we throw
_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n"
logger.error(_msg)
raise RuntimeError(_msg)

if res.HasField("exception"):
exception_msg = f"Exception at evaluator occurred:\n{res.exception}\n\n"
logger.error(exception_msg)
logger.error(f"Exception at evaluator occurred:\n{res.exception}\n\n")
self.cleanup_evaluations([evaluation_id])
logger.error(f"Performed cleanup for evaluation {evaluation_id} that threw exception.")
has_exception = True
break
break # Exit busy wait

if not res.is_running:
break
break # Exit busy wait

sleep(1)

return not has_exception

def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalData]:
assert self.evaluator is not None
if not self.connected_to_evaluator:
raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.")

req = EvaluationResultRequest(evaluation_id=evaluation_id)
res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req)

for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, min=2, max=60),
reraise=True,
):
with attempt:
try:
res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req)
except grpc.RpcError as e: # We catch and reraise to easily reconnect
logger.error(e)
logger.error(f"[Evaluation {evaluation_id}]: gRPC connection error, trying to reconnect.")
self.init_evaluator()
raise e

if not res.valid:
logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}")
Expand All @@ -334,7 +352,19 @@ def cleanup_evaluations(self, evaluation_ids: list[int]) -> None:
assert self.evaluator is not None

req = EvaluationCleanupRequest(evaluation_ids=set(evaluation_ids))
res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req)
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, min=2, max=60),
reraise=True,
):
with attempt:
try:
res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req)
except grpc.RpcError as e: # We catch and reraise to easily reconnect
logger.error(e)
logger.error(f"[Evaluations {evaluation_ids}]: gRPC connection error, trying to reconnect.")
self.init_evaluator()
raise e

failed = set(evaluation_ids) - {int(i) for i in res.succeeded}
if failed:
Expand Down
24 changes: 19 additions & 5 deletions modyn/supervisor/internal/pipeline_executor/evaluation_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,11 @@ def _single_batched_evaluation(
def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name

started_evaluations = []

for attempt in Retrying(
stop=stop_after_attempt(10),
wait=wait_random_exponential(multiplier=1, min=2, max=120),
wait=wait_random_exponential(multiplier=2, min=2, max=180),
reraise=True,
):
with attempt:
Expand Down Expand Up @@ -377,9 +379,17 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
return failure_reasons

logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.")
self.grpc.wait_for_evaluation_completion(response.evaluation_id)
eval_data = self.grpc.get_evaluation_results(response.evaluation_id)
self.grpc.cleanup_evaluations([response.evaluation_id])
started_evaluations.append(response.evaluation_id)
if not self.grpc.wait_for_evaluation_completion(response.evaluation_id):
raise RuntimeError("There was an exception during evaluation") # Trigger retry

eval_data = self.grpc.get_evaluation_results(
response.evaluation_id
) # Will throw in case of invalid result

self.grpc.cleanup_evaluations(
[response.evaluation_id]
) # Early cleanup if succeeded since we have the data

eval_results: list[tuple[str | None, dict[str, Any]]] = []

Expand All @@ -397,6 +407,7 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
# the next loop that unwraps `EvaluationResultResponse`.
# ----------------------------------------------------- . ---------------------------------------------------- #

# Prepare eval_results structure
for interval_response in response.interval_responses:
if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED:
reason = get_failure_reason(interval_response.eval_aborted_reason)
Expand All @@ -409,6 +420,7 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
)
)

# Parse response into eval_results structure
for interval_result in eval_data:
interval_idx = interval_result.interval_index
assert (
Expand Down Expand Up @@ -438,9 +450,11 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
+ str(eval_results)
)

# All checks succeeded
self.grpc.cleanup_evaluations(started_evaluations) # Make sure to clean up everything we started.
return eval_results

raise RuntimeError("Unreachable code - satisfy mypy.")
raise RuntimeError("Unreachable code - just to satisfy mypy.")


# ------------------------------------------------------------------------------------ #
Expand Down

0 comments on commit 4e3564d

Please sign in to comment.