Skip to content

Commit

Permalink
feature: More resilient evaluations (#635)
Browse files Browse the repository at this point in the history
This PR addresses several issues:

1) When the evaluator crashes with an exception, we actually also send
this to the supervisor

2) We validate whether we receive the correct number of results at the
supervisor

3) We fix the validation of the correct number of evaluations at the
evaluator

4) Some minor additions of retrying to connect in case the connection
gets lost

Our observation of missing evaluations should now be caught at the first
point (we actually retry on exception at the evaluator) but the others
serve as a guarding mechanism.
  • Loading branch information
MaxiBoether authored Oct 8, 2024
1 parent a15b3c6 commit 6436929
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 133 deletions.
5 changes: 3 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ disable=raw-checker-failed,
too-many-arguments, # we can't determine a good limit here. reviews should spot bad cases of this.
duplicate-code, # Mostly imports and test setup.
cyclic-import, # We use these inside methods that require models from multiple apps. Tests will catch actual errors.
too-many-instance-attributes, # We always ignore this anyways
too-many-positional-arguments # We do not want to limit the number of positional arguments
too-many-instance-attributes, # We always ignore this anyways
too-many-positional-arguments, # We do not want to limit the number of positional arguments
too-many-locals # We always ignore this anyways
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
Expand Down
126 changes: 126 additions & 0 deletions analytics/tools/find_invalid_runs.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Motivation\n",
"\n",
"This notebook can be used to find pipeline runs where we have empty evaluation responses despite expecting some. Older versions of Modyn have lest robustness in the evaluation handling."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"from tqdm import tqdm\n",
"\n",
"from modyn.supervisor.internal.grpc.enums import PipelineStage\n",
"from modyn.supervisor.internal.pipeline_executor.models import MultiEvaluationInfo, PipelineLogs, SingleEvaluationInfo\n",
"\n",
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"log_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/debug/logs\")\n",
"logfiles = [logfile for logfile in log_dir.glob(\"**/pipeline.log\")]\n",
"logfiles"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def metrics_valid(logfile: Path):\n",
" logs = PipelineLogs.model_validate_json(logfile.read_text())\n",
" for eval_log in logs.supervisor_logs.stage_runs:\n",
" if eval_log.id == PipelineStage.EVALUATE_MULTI.name:\n",
" multiinfo = eval_log.info\n",
" assert isinstance(multiinfo, MultiEvaluationInfo)\n",
"\n",
" for info in multiinfo.interval_results:\n",
" assert isinstance(info, SingleEvaluationInfo)\n",
" res = info.results\n",
"\n",
" if len(res[\"metrics\"]) == 0:\n",
" if res[\"dataset_size\"] == 0:\n",
" print(\n",
" f\"Warning: Empty metrics but empty dataset in {logfile}: {info}\"\n",
" ) # Might want to remove this - not sure if needed.\n",
" else:\n",
" return False\n",
"\n",
" return True"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"invalid_pipelines = []\n",
"for logfile in tqdm(logfiles):\n",
" if not metrics_valid(logfile):\n",
" invalid_pipelines.append(logfile)\n",
"\n",
"invalid_pipelines\n",
"\n",
"# Typically, you'd want to delete those directories because they are invalid (see next cell)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Commented out for safety\n",
"\n",
"\"\"\"\n",
"import shutil\n",
"parent_dirs = {file_path.parent for file_path in invalid_pipelines}\n",
"\n",
"for directory in parent_dirs:\n",
" try:\n",
" shutil.rmtree(directory)\n",
" except Exception as e:\n",
" print(f\"Failed to delete {directory}: {e}\")\n",
"\"\"\""
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
24 changes: 15 additions & 9 deletions modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,16 @@ def get_evaluation_result(
logger.info(f"Received get evaluation result request for evaluation {evaluation_id}.")

if evaluation_id not in self._evaluation_dict:
logger.error(f"Evaluation with id {evaluation_id} has not been registered.")
logger.error(f"Evaluation {evaluation_id} has not been registered.")
return EvaluationResultResponse(valid=False)

self._drain_result_queue(evaluation_id) # Should already be drained, but just make sure

if self._evaluation_process_dict[evaluation_id].process_handler.is_alive():
logger.error(f"Evaluation with id {evaluation_id} is still running.")
logger.error(f"Evaluation {evaluation_id} is still running.")
return EvaluationResultResponse(valid=False)

logger.info("Returning results of all metrics.")
logger.info(f"[Evaluation {evaluation_id}] Returning results of all metrics.")
self._drain_result_queue(evaluation_id) # Should not do anything, but let's make sure

evaluation_data: list[EvaluationIntervalData] = []
Expand All @@ -317,12 +317,15 @@ def get_evaluation_result(
single_eval_data = EvaluationIntervalData(interval_index=interval_idx, evaluation_data=metric_result)
evaluation_data.append(single_eval_data)

if len(evaluation_data) < len(self._evaluation_dict[evaluation_id].not_failed_interval_ids):
expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)
if len(evaluation_data) < expected_results:
logger.error(
f"Could not retrieve results for all intervals of evaluation {evaluation_id}. "
f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)}, "
f"but got {len(evaluation_data)}. Maybe an exception happened during evaluation."
f"Expected {expected_results} results, "
f"but got {len(evaluation_data)} results. Most likely, an exception happened during evaluation."
)
return EvaluationResultResponse(valid=False)

return EvaluationResultResponse(valid=True, evaluation_results=evaluation_data)

def cleanup_evaluations(
Expand Down Expand Up @@ -350,9 +353,12 @@ def cleanup_evaluations(
self._evaluation_process_dict.pop(evaluation_id)

for e_id in evaluation_ids:
self._evaluation_dict.pop(e_id)
self._evaluation_data_dict.pop(e_id)
self._evaluation_data_dict_locks.pop(e_id)
if e_id in self._evaluation_dict:
self._evaluation_dict.pop(e_id)
if e_id in self._evaluation_data_dict:
self._evaluation_data_dict.pop(e_id)
if e_id in self._evaluation_data_dict_locks:
self._evaluation_data_dict_locks.pop(e_id)

gc.collect()
return EvaluationCleanupResponse(succeeded=list(sorted(already_cleaned + not_yet_cleaned)))
1 change: 0 additions & 1 deletion modyn/evaluator/internal/pytorch_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __init__(
)

self._device = evaluation_info.device
self._device_type = "cuda" if "cuda" in self._device else "cpu"
self._amp = evaluation_info.amp

self._info("Initialized PyTorch evaluator.")
Expand Down
8 changes: 5 additions & 3 deletions modyn/selector/internal/grpc/selector_grpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, selector_manager: SelectorManager, sample_batch_size: int):
self.selector_manager = selector_manager
self._sample_batch_size = sample_batch_size

def get_sample_keys_and_weights( # pylint: disable-next=unused-argument
def get_sample_keys_and_weights( # pylint: disable-next=unused-argument, too-many-locals
self, request: GetSamplesRequest, context: grpc.ServicerContext
) -> Iterable[SamplesResponse]:
pipeline_id, trigger_id, worker_id, partition_id = (
Expand All @@ -57,10 +57,11 @@ def get_sample_keys_and_weights( # pylint: disable-next=unused-argument
tid = threading.get_native_id()
pid = os.getpid()

logger.info(
_logmsg = (
f"[{pid}][{tid}][Pipeline {pipeline_id}]: Fetching samples for trigger id {trigger_id}"
+ f" and worker id {worker_id} and partition id {partition_id}"
)
logger.info(_logmsg)

samples = self.selector_manager.get_sample_keys_and_weights(pipeline_id, trigger_id, worker_id, partition_id)

Expand Down Expand Up @@ -89,10 +90,11 @@ def inform_data_and_trigger(self, request: DataInformRequest, context: grpc.Serv
pipeline_id, keys, timestamps, labels = request.pipeline_id, request.keys, request.timestamps, request.labels
tid = threading.get_native_id()
pid = os.getpid()
logger.info(
_lgmsg = (
f"[{pid}][{tid}][Pipeline {pipeline_id}]: Selector is informed of {len(keys)} new data points"
+ f"+ trigger at timestamp {timestamps[-1] if len(keys) > 0 else 'n/a'}"
)
logger.info(_lgmsg)

trigger_id, log = self.selector_manager.inform_data_and_trigger(pipeline_id, keys, timestamps, labels)
return TriggerResponse(trigger_id=trigger_id, log=JsonString(value=json.dumps(log)))
Expand Down
59 changes: 47 additions & 12 deletions modyn/supervisor/internal/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,39 +302,74 @@ def wait_for_evaluation_completion(self, evaluation_id: int) -> bool:
self.init_evaluator()
raise e

# NOT within retry block
if not res.valid:
exception_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n"
logger.error(exception_msg)
raise RuntimeError(exception_msg)
# Should only happen when requesting invalid id, hence we throw
_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n"
logger.error(_msg)
raise RuntimeError(_msg)

if res.HasField("exception"):
exception_msg = f"Exception at evaluator occurred:\n{res.exception}\n\n"
logger.error(exception_msg)
logger.error(f"Exception at evaluator occurred:\n{res.exception}\n\n")
self.cleanup_evaluations([evaluation_id])
logger.error(f"Performed cleanup for evaluation {evaluation_id} that threw exception.")
has_exception = True
break
break # Exit busy wait

if not res.is_running:
break
logger.info(f"Evaluation {evaluation_id} has finished successfully.")
break # Exit busy wait

sleep(1)

return not has_exception

def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalData]:
assert self.evaluator is not None
if not self.connected_to_evaluator:
raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.")

req = EvaluationResultRequest(evaluation_id=evaluation_id)
res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req)

for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, min=2, max=60),
reraise=True,
):
with attempt:
try:
res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req)
except grpc.RpcError as e: # We catch and reraise to easily reconnect
logger.error(e)
logger.error(f"[Evaluation {evaluation_id}]: gRPC connection error, trying to reconnect.")
self.init_evaluator()
raise e

if not res.valid:
logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}")
raise RuntimeError(f"Cannot get the evaluation result for evaluation {evaluation_id}")
_msg = f"Cannot get the evaluation result for evaluation {evaluation_id}"
logger.error(_msg)
raise RuntimeError(_msg)

logger.debug(f"Obtained evaluation results for evaluation {evaluation_id}")

return res.evaluation_results

def cleanup_evaluations(self, evaluation_ids: list[int]) -> None:
assert self.evaluator is not None

req = EvaluationCleanupRequest(evaluation_ids=set(evaluation_ids))
res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req)
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, min=2, max=60),
reraise=True,
):
with attempt:
try:
res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req)
except grpc.RpcError as e: # We catch and reraise to easily reconnect
logger.error(e)
logger.error(f"[Evaluations {evaluation_ids}]: gRPC connection error, trying to reconnect.")
self.init_evaluator()
raise e

failed = set(evaluation_ids) - {int(i) for i in res.succeeded}
if failed:
Expand Down
Loading

0 comments on commit 6436929

Please sign in to comment.