From 5f23bc8e8496a3a44c09c4172bd3470b73f96398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Mon, 23 Sep 2024 22:41:41 +0800 Subject: [PATCH] Some fixes --- .../internal/grpc/evaluator_grpc_servicer.py | 11 +++++------ modyn/supervisor/internal/grpc_handler.py | 3 +++ .../internal/pipeline_executor/evaluation_executor.py | 4 +++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py index 004bf9cae..4db4486bb 100644 --- a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py +++ b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py @@ -299,16 +299,16 @@ def get_evaluation_result( logger.info(f"Received get evaluation result request for evaluation {evaluation_id}.") if evaluation_id not in self._evaluation_dict: - logger.error(f"Evaluation with id {evaluation_id} has not been registered.") + logger.error(f"Evaluation {evaluation_id} has not been registered.") return EvaluationResultResponse(valid=False) self._drain_result_queue(evaluation_id) # Should already be drained, but just make sure if self._evaluation_process_dict[evaluation_id].process_handler.is_alive(): - logger.error(f"Evaluation with id {evaluation_id} is still running.") + logger.error(f"Evaluation {evaluation_id} is still running.") return EvaluationResultResponse(valid=False) - logger.info("Returning results of all metrics.") + logger.info(f"[Evaluation {evaluation_id}] Returning results of all metrics.") self._drain_result_queue(evaluation_id) # Should not do anything, but let's make sure evaluation_data: list[EvaluationIntervalData] = [] @@ -317,12 +317,11 @@ def get_evaluation_result( single_eval_data = EvaluationIntervalData(interval_index=interval_idx, evaluation_data=metric_result) evaluation_data.append(single_eval_data) - num_metrics = len(self._evaluation_dict[evaluation_id].raw_metrics) - expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids) * num_metrics + expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids) if len(evaluation_data) < expected_results: logger.error( f"Could not retrieve results for all intervals of evaluation {evaluation_id}. " - f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)} * {num_metrics} = {expected_results} results, " + f"Expected {expected_results} results, " f"but got {len(evaluation_data)} results. Most likely, an exception happened during evaluation." ) return EvaluationResultResponse(valid=False) diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index a5544ba25..b2d98c1af 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -317,6 +317,7 @@ def wait_for_evaluation_completion(self, evaluation_id: int) -> bool: break # Exit busy wait if not res.is_running: + logger.info(f"Evaluation {evaluation_id} has finished successfully.") break # Exit busy wait sleep(1) @@ -348,6 +349,8 @@ def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalD logger.error(_msg) raise RuntimeError(_msg) + logger.debug(f"Obtained evaluation results for evaluation {evaluation_id}") + return res.evaluation_results def cleanup_evaluations(self, evaluation_ids: list[int]) -> None: diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index 81cf7b49d..cb87fac43 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -378,7 +378,9 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: # This is likely due to an invalid request in the first place. return failure_reasons - logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.") + logger.info( + f"Evaluation {response.evaluation_id} started for model {model_id_to_eval} on intervals {intervals}." + ) started_evaluations.append(response.evaluation_id) if not self.grpc.wait_for_evaluation_completion(response.evaluation_id): raise RuntimeError("There was an exception during evaluation") # Trigger retry