From aa31618bf1cc113881c60fd9e53761ba010e4301 Mon Sep 17 00:00:00 2001 From: Robin Holzinger Date: Sun, 22 Sep 2024 11:42:04 +0200 Subject: [PATCH 1/5] hotfix: Fix Accuracy metric (#636) --- modyn/evaluator/internal/metrics/accuracy.py | 3 --- modyn/tests/evaluator/internal/metrics/test_accuracy.py | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/modyn/evaluator/internal/metrics/accuracy.py b/modyn/evaluator/internal/metrics/accuracy.py index 540b7e27a..5c881d8df 100644 --- a/modyn/evaluator/internal/metrics/accuracy.py +++ b/modyn/evaluator/internal/metrics/accuracy.py @@ -24,9 +24,6 @@ def _batch_evaluated_callback(self, y_true: torch.Tensor, y_pred: torch.Tensor, self.total_correct += labeled_correctly self.samples_seen += batch_size - self.total_correct += labeled_correctly - self.samples_seen += batch_size - def get_evaluation_result(self) -> float: if self.samples_seen == 0: self.warning("Did not see any samples.") diff --git a/modyn/tests/evaluator/internal/metrics/test_accuracy.py b/modyn/tests/evaluator/internal/metrics/test_accuracy.py index 379611c77..a197cb953 100644 --- a/modyn/tests/evaluator/internal/metrics/test_accuracy.py +++ b/modyn/tests/evaluator/internal/metrics/test_accuracy.py @@ -67,6 +67,7 @@ def test_accuracy() -> None: accuracy.evaluate_batch(y_true, y_pred, 6) assert accuracy.get_evaluation_result() == pytest.approx(1.0 / 3) + assert accuracy.samples_seen - accuracy.total_correct == 0 + 6 + 4 def test_accuracy_invalid() -> None: From a15b3c62b39786074874d1363c03b8676ec11bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= <2116466+MaxiBoether@users.noreply.github.com> Date: Mon, 23 Sep 2024 11:55:51 +0200 Subject: [PATCH 2/5] fix: ignore pylint `too-many-positional-argument` (#637) The pylint check `too-many-positional-arguments` limits the number of positional arguments. We do not want to limit that. --- .pylintrc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pylintrc b/.pylintrc index 11f3267d3..9469027d4 100644 --- a/.pylintrc +++ b/.pylintrc @@ -186,8 +186,8 @@ disable=raw-checker-failed, too-many-arguments, # we can't determine a good limit here. reviews should spot bad cases of this. duplicate-code, # Mostly imports and test setup. cyclic-import, # We use these inside methods that require models from multiple apps. Tests will catch actual errors. - too-many-instance-attributes # We always ignore this anyways - + too-many-instance-attributes, # We always ignore this anyways + too-many-positional-arguments # We do not want to limit the number of positional arguments # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option # multiple time (only on the command line, not in the configuration file where From 6436929d5daca7db89bf3c837befbb3f57c4ab40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= <2116466+MaxiBoether@users.noreply.github.com> Date: Tue, 8 Oct 2024 09:21:39 +0200 Subject: [PATCH 3/5] feature: More resilient evaluations (#635) This PR addresses several issues: 1) When the evaluator crashes with an exception, we actually also send this to the supervisor 2) We validate whether we receive the correct number of results at the supervisor 3) We fix the validation of the correct number of evaluations at the evaluator 4) Some minor additions of retrying to connect in case the connection gets lost Our observation of missing evaluations should now be caught at the first point (we actually retry on exception at the evaluator) but the others serve as a guarding mechanism. --- .pylintrc | 5 +- analytics/tools/find_invalid_runs.ipynb | 126 +++++++++++++ .../internal/grpc/evaluator_grpc_servicer.py | 24 ++- modyn/evaluator/internal/pytorch_evaluator.py | 1 - .../internal/grpc/selector_grpc_servicer.py | 8 +- modyn/supervisor/internal/grpc_handler.py | 59 ++++-- .../pipeline_executor/evaluation_executor.py | 168 +++++++++++------- .../internal/triggers/datadrifttrigger.py | 4 +- .../grpc/test_evaluator_grpc_servicer.py | 17 +- .../internal/test_pytorch_evaluator.py | 2 - .../test_evaluation_executor.py | 7 +- .../test_pipeline_executor.py | 17 +- .../supervisor/internal/test_grpc_handler.py | 36 ++-- 13 files changed, 341 insertions(+), 133 deletions(-) create mode 100644 analytics/tools/find_invalid_runs.ipynb diff --git a/.pylintrc b/.pylintrc index 9469027d4..c94ec0da8 100644 --- a/.pylintrc +++ b/.pylintrc @@ -186,8 +186,9 @@ disable=raw-checker-failed, too-many-arguments, # we can't determine a good limit here. reviews should spot bad cases of this. duplicate-code, # Mostly imports and test setup. cyclic-import, # We use these inside methods that require models from multiple apps. Tests will catch actual errors. - too-many-instance-attributes, # We always ignore this anyways - too-many-positional-arguments # We do not want to limit the number of positional arguments + too-many-instance-attributes, # We always ignore this anyways + too-many-positional-arguments, # We do not want to limit the number of positional arguments + too-many-locals # We always ignore this anyways # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option # multiple time (only on the command line, not in the configuration file where diff --git a/analytics/tools/find_invalid_runs.ipynb b/analytics/tools/find_invalid_runs.ipynb new file mode 100644 index 000000000..e76e226c6 --- /dev/null +++ b/analytics/tools/find_invalid_runs.ipynb @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Motivation\n", + "\n", + "This notebook can be used to find pipeline runs where we have empty evaluation responses despite expecting some. Older versions of Modyn have lest robustness in the evaluation handling." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "from tqdm import tqdm\n", + "\n", + "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", + "from modyn.supervisor.internal.pipeline_executor.models import MultiEvaluationInfo, PipelineLogs, SingleEvaluationInfo\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "log_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/debug/logs\")\n", + "logfiles = [logfile for logfile in log_dir.glob(\"**/pipeline.log\")]\n", + "logfiles" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def metrics_valid(logfile: Path):\n", + " logs = PipelineLogs.model_validate_json(logfile.read_text())\n", + " for eval_log in logs.supervisor_logs.stage_runs:\n", + " if eval_log.id == PipelineStage.EVALUATE_MULTI.name:\n", + " multiinfo = eval_log.info\n", + " assert isinstance(multiinfo, MultiEvaluationInfo)\n", + "\n", + " for info in multiinfo.interval_results:\n", + " assert isinstance(info, SingleEvaluationInfo)\n", + " res = info.results\n", + "\n", + " if len(res[\"metrics\"]) == 0:\n", + " if res[\"dataset_size\"] == 0:\n", + " print(\n", + " f\"Warning: Empty metrics but empty dataset in {logfile}: {info}\"\n", + " ) # Might want to remove this - not sure if needed.\n", + " else:\n", + " return False\n", + "\n", + " return True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "invalid_pipelines = []\n", + "for logfile in tqdm(logfiles):\n", + " if not metrics_valid(logfile):\n", + " invalid_pipelines.append(logfile)\n", + "\n", + "invalid_pipelines\n", + "\n", + "# Typically, you'd want to delete those directories because they are invalid (see next cell)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Commented out for safety\n", + "\n", + "\"\"\"\n", + "import shutil\n", + "parent_dirs = {file_path.parent for file_path in invalid_pipelines}\n", + "\n", + "for directory in parent_dirs:\n", + " try:\n", + " shutil.rmtree(directory)\n", + " except Exception as e:\n", + " print(f\"Failed to delete {directory}: {e}\")\n", + "\"\"\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py index d98bce52a..d92d064b3 100644 --- a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py +++ b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py @@ -299,16 +299,16 @@ def get_evaluation_result( logger.info(f"Received get evaluation result request for evaluation {evaluation_id}.") if evaluation_id not in self._evaluation_dict: - logger.error(f"Evaluation with id {evaluation_id} has not been registered.") + logger.error(f"Evaluation {evaluation_id} has not been registered.") return EvaluationResultResponse(valid=False) self._drain_result_queue(evaluation_id) # Should already be drained, but just make sure if self._evaluation_process_dict[evaluation_id].process_handler.is_alive(): - logger.error(f"Evaluation with id {evaluation_id} is still running.") + logger.error(f"Evaluation {evaluation_id} is still running.") return EvaluationResultResponse(valid=False) - logger.info("Returning results of all metrics.") + logger.info(f"[Evaluation {evaluation_id}] Returning results of all metrics.") self._drain_result_queue(evaluation_id) # Should not do anything, but let's make sure evaluation_data: list[EvaluationIntervalData] = [] @@ -317,12 +317,15 @@ def get_evaluation_result( single_eval_data = EvaluationIntervalData(interval_index=interval_idx, evaluation_data=metric_result) evaluation_data.append(single_eval_data) - if len(evaluation_data) < len(self._evaluation_dict[evaluation_id].not_failed_interval_ids): + expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids) + if len(evaluation_data) < expected_results: logger.error( f"Could not retrieve results for all intervals of evaluation {evaluation_id}. " - f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)}, " - f"but got {len(evaluation_data)}. Maybe an exception happened during evaluation." + f"Expected {expected_results} results, " + f"but got {len(evaluation_data)} results. Most likely, an exception happened during evaluation." ) + return EvaluationResultResponse(valid=False) + return EvaluationResultResponse(valid=True, evaluation_results=evaluation_data) def cleanup_evaluations( @@ -350,9 +353,12 @@ def cleanup_evaluations( self._evaluation_process_dict.pop(evaluation_id) for e_id in evaluation_ids: - self._evaluation_dict.pop(e_id) - self._evaluation_data_dict.pop(e_id) - self._evaluation_data_dict_locks.pop(e_id) + if e_id in self._evaluation_dict: + self._evaluation_dict.pop(e_id) + if e_id in self._evaluation_data_dict: + self._evaluation_data_dict.pop(e_id) + if e_id in self._evaluation_data_dict_locks: + self._evaluation_data_dict_locks.pop(e_id) gc.collect() return EvaluationCleanupResponse(succeeded=list(sorted(already_cleaned + not_yet_cleaned))) diff --git a/modyn/evaluator/internal/pytorch_evaluator.py b/modyn/evaluator/internal/pytorch_evaluator.py index 1abe14c80..b3ad49ce8 100644 --- a/modyn/evaluator/internal/pytorch_evaluator.py +++ b/modyn/evaluator/internal/pytorch_evaluator.py @@ -39,7 +39,6 @@ def __init__( ) self._device = evaluation_info.device - self._device_type = "cuda" if "cuda" in self._device else "cpu" self._amp = evaluation_info.amp self._info("Initialized PyTorch evaluator.") diff --git a/modyn/selector/internal/grpc/selector_grpc_servicer.py b/modyn/selector/internal/grpc/selector_grpc_servicer.py index d3012b9fe..7750d5fe8 100644 --- a/modyn/selector/internal/grpc/selector_grpc_servicer.py +++ b/modyn/selector/internal/grpc/selector_grpc_servicer.py @@ -45,7 +45,7 @@ def __init__(self, selector_manager: SelectorManager, sample_batch_size: int): self.selector_manager = selector_manager self._sample_batch_size = sample_batch_size - def get_sample_keys_and_weights( # pylint: disable-next=unused-argument + def get_sample_keys_and_weights( # pylint: disable-next=unused-argument, too-many-locals self, request: GetSamplesRequest, context: grpc.ServicerContext ) -> Iterable[SamplesResponse]: pipeline_id, trigger_id, worker_id, partition_id = ( @@ -57,10 +57,11 @@ def get_sample_keys_and_weights( # pylint: disable-next=unused-argument tid = threading.get_native_id() pid = os.getpid() - logger.info( + _logmsg = ( f"[{pid}][{tid}][Pipeline {pipeline_id}]: Fetching samples for trigger id {trigger_id}" + f" and worker id {worker_id} and partition id {partition_id}" ) + logger.info(_logmsg) samples = self.selector_manager.get_sample_keys_and_weights(pipeline_id, trigger_id, worker_id, partition_id) @@ -89,10 +90,11 @@ def inform_data_and_trigger(self, request: DataInformRequest, context: grpc.Serv pipeline_id, keys, timestamps, labels = request.pipeline_id, request.keys, request.timestamps, request.labels tid = threading.get_native_id() pid = os.getpid() - logger.info( + _lgmsg = ( f"[{pid}][{tid}][Pipeline {pipeline_id}]: Selector is informed of {len(keys)} new data points" + f"+ trigger at timestamp {timestamps[-1] if len(keys) > 0 else 'n/a'}" ) + logger.info(_lgmsg) trigger_id, log = self.selector_manager.inform_data_and_trigger(pipeline_id, keys, timestamps, labels) return TriggerResponse(trigger_id=trigger_id, log=JsonString(value=json.dumps(log))) diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index 9f6330220..b2d98c1af 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -302,39 +302,74 @@ def wait_for_evaluation_completion(self, evaluation_id: int) -> bool: self.init_evaluator() raise e + # NOT within retry block if not res.valid: - exception_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n" - logger.error(exception_msg) - raise RuntimeError(exception_msg) + # Should only happen when requesting invalid id, hence we throw + _msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n" + logger.error(_msg) + raise RuntimeError(_msg) if res.HasField("exception"): - exception_msg = f"Exception at evaluator occurred:\n{res.exception}\n\n" - logger.error(exception_msg) + logger.error(f"Exception at evaluator occurred:\n{res.exception}\n\n") + self.cleanup_evaluations([evaluation_id]) + logger.error(f"Performed cleanup for evaluation {evaluation_id} that threw exception.") has_exception = True - break + break # Exit busy wait + if not res.is_running: - break + logger.info(f"Evaluation {evaluation_id} has finished successfully.") + break # Exit busy wait + sleep(1) + return not has_exception def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalData]: assert self.evaluator is not None if not self.connected_to_evaluator: raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.") - req = EvaluationResultRequest(evaluation_id=evaluation_id) - res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) + + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_random_exponential(multiplier=1, min=2, max=60), + reraise=True, + ): + with attempt: + try: + res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) + except grpc.RpcError as e: # We catch and reraise to easily reconnect + logger.error(e) + logger.error(f"[Evaluation {evaluation_id}]: gRPC connection error, trying to reconnect.") + self.init_evaluator() + raise e if not res.valid: - logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}") - raise RuntimeError(f"Cannot get the evaluation result for evaluation {evaluation_id}") + _msg = f"Cannot get the evaluation result for evaluation {evaluation_id}" + logger.error(_msg) + raise RuntimeError(_msg) + + logger.debug(f"Obtained evaluation results for evaluation {evaluation_id}") + return res.evaluation_results def cleanup_evaluations(self, evaluation_ids: list[int]) -> None: assert self.evaluator is not None req = EvaluationCleanupRequest(evaluation_ids=set(evaluation_ids)) - res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req) + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_random_exponential(multiplier=1, min=2, max=60), + reraise=True, + ): + with attempt: + try: + res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req) + except grpc.RpcError as e: # We catch and reraise to easily reconnect + logger.error(e) + logger.error(f"[Evaluations {evaluation_ids}]: gRPC connection error, trying to reconnect.") + self.init_evaluator() + raise e failed = set(evaluation_ids) - {int(i) for i in res.succeeded} if failed: diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index d4592999a..75333d46f 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -339,80 +339,126 @@ def _single_batched_evaluation( self.pipeline.evaluation.device, intervals=cast(list[tuple[int | None, int | None]], intervals), ) + + def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: + return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name + + started_evaluations = [] + for attempt in Retrying( - stop=stop_after_attempt(5), - wait=wait_random_exponential(multiplier=1, min=2, max=60), + stop=stop_after_attempt(10), + wait=wait_random_exponential(multiplier=2, min=2, max=180), reraise=True, ): with attempt: try: response: EvaluateModelResponse = self.grpc.evaluator.evaluate_model(request) - except grpc.RpcError as e: # We catch and reraise to reconnect + except grpc.RpcError as e: # We catch and reraise them to tenacity after reconnecting logger.error(e) logger.error("gRPC connection error, trying to reconnect...") self.grpc.init_evaluator() raise e - assert len(response.interval_responses) == len( - intervals - ), f"We expected {len(intervals)} intervals, but got {len(response.interval_responses)}." - - def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: - return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name - - if not response.evaluation_started: - failure_reasons: list[tuple[str | None, dict]] = [] - # note: interval indexes correspond to the intervals in the request - for interval_idx, interval_response in enumerate(response.interval_responses): - if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: - reason = get_failure_reason(interval_response.eval_aborted_reason) - failure_reasons.append((reason, {})) - logger.error( - f"Evaluation for model {model_id_to_eval} on split {intervals[interval_idx]} " - f"not started with reason: {reason}." - ) - return failure_reasons - - logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.") - self.grpc.wait_for_evaluation_completion(response.evaluation_id) - eval_data = self.grpc.get_evaluation_results(response.evaluation_id) - self.grpc.cleanup_evaluations([response.evaluation_id]) - - eval_results: list[tuple[str | None, dict[str, Any]]] = [] - - # ---------------------------------------------- Result Builder ---------------------------------------------- # - # The `eval_results` list is a list of tuples. Each tuple contains a failure reason (if any) and a dictionary - # with the evaluation results. The order of the tuples corresponds to the order of the intervals. - # - # response.interval_responses contains the evaluation results for each interval in the same order as the - # intervals in the request. Failed evaluations are marked with a failure reason. - - # Metric results come from the `EvaluateModelResponse` and are stored in the `evaluation_data` field. This - # only contains the metrics for the intervals that were successfully evaluated. - # - # Therefore we first build a list of results with the same order as the intervals. The metrics will be filled in - # the next loop that unwraps `EvaluationResultResponse`. - # ----------------------------------------------------- . ---------------------------------------------------- # - - for interval_response in response.interval_responses: - if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: - reason = get_failure_reason(interval_response.eval_aborted_reason) - eval_results.append((reason, {})) - else: - eval_results.append( - ( - None, - {"dataset_size": interval_response.dataset_size, "metrics": []}, - ) + assert len(response.interval_responses) == len( + intervals + ), f"We expected {len(intervals)} intervals, but got {len(response.interval_responses)}." + + if not response.evaluation_started: + failure_reasons: list[tuple[str | None, dict]] = [] + # note: interval indexes correspond to the intervals in the request + for interval_idx, interval_response in enumerate(response.interval_responses): + if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: + reason = get_failure_reason(interval_response.eval_aborted_reason) + failure_reasons.append((reason, {})) + logger.error( + f"Evaluation for model {model_id_to_eval} on split {intervals[interval_idx]} " + f"not started with reason: {reason}." + ) + # No retrying here, if we were to retry it should be done at the evaluator + # This is likely due to an invalid request in the first place. + return failure_reasons + + logger.info( + f"Evaluation {response.evaluation_id} started for model {model_id_to_eval} on intervals {intervals}." ) + started_evaluations.append(response.evaluation_id) + if not self.grpc.wait_for_evaluation_completion(response.evaluation_id): + raise RuntimeError("There was an exception during evaluation") # Trigger retry + + eval_data = self.grpc.get_evaluation_results( + response.evaluation_id + ) # Will throw in case of invalid result + + self.grpc.cleanup_evaluations( + [response.evaluation_id] + ) # Early cleanup if succeeded since we have the data + + eval_results: list[tuple[str | None, dict[str, Any]]] = [] + + # ---------------------------------------------- Result Builder ---------------------------------------------- # + # The `eval_results` list is a list of tuples. Each tuple contains a failure reason (if any) and a dictionary + # with the evaluation results. The order of the tuples corresponds to the order of the intervals. + # + # response.interval_responses contains the evaluation results for each interval in the same order as the + # intervals in the request. Failed evaluations are marked with a failure reason. + + # Metric results come from the `EvaluateModelResponse` and are stored in the `evaluation_data` field. This + # only contains the metrics for the intervals that were successfully evaluated. + # + # Therefore we first build a list of results with the same order as the intervals. The metrics will be filled in + # the next loop that unwraps `EvaluationResultResponse`. + # ----------------------------------------------------- . ---------------------------------------------------- # + + # Prepare eval_results structure + for interval_response in response.interval_responses: + if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: + reason = get_failure_reason(interval_response.eval_aborted_reason) + eval_results.append((reason, {})) + else: + eval_results.append( + ( + None, + {"dataset_size": interval_response.dataset_size, "metrics": []}, + ) + ) + + # Parse response into eval_results structure + for interval_result in eval_data: + interval_idx = interval_result.interval_index + assert ( + eval_results[interval_idx][0] is None + ), "Evaluation failed, this interval idx should not be returned by evaluator." + eval_results[interval_idx][1]["metrics"] = [ + {"name": metric.metric, "result": metric.result} for metric in interval_result.evaluation_data + ] + + # Assert that all evaluated intervals have all metrics + # Will trigger a retry in case this is not successful + # Can happen, e.g., if the evaluator is overloaded + expected_num_metrics = len(request.metrics) + for result_id, (abort_reason, data_dict) in enumerate(eval_results): + if abort_reason is not None: + # If there was any reason to abort, we don't care + continue + + assert ( + data_dict["dataset_size"] > 0 + ), f"dataset size of 0, but no EMPTY_INTERVAL response: {eval_results}" + actual_num_metrics = len(data_dict["metrics"]) + assert actual_num_metrics == expected_num_metrics, ( + f"result {result_id}: actual_num_metrics = {actual_num_metrics}" + + f" != expected_num_metrics = {expected_num_metrics}" + + "\n" + + str(eval_results) + + "\n\n" + + str(eval_data) + ) + + # All checks succeeded + self.grpc.cleanup_evaluations(started_evaluations) # Make sure to clean up everything we started. + return eval_results - for interval_result in eval_data: - interval_idx = interval_result.interval_index - assert eval_results[interval_idx][0] is None, "Evaluation failed, no metrics should be present." - eval_results[interval_idx][1]["metrics"] = [ - {"name": metric.metric, "result": metric.result} for metric in interval_result.evaluation_data - ] - return eval_results + raise RuntimeError("Unreachable code - just to satisfy mypy.") # ------------------------------------------------------------------------------------ # diff --git a/modyn/supervisor/internal/triggers/datadrifttrigger.py b/modyn/supervisor/internal/triggers/datadrifttrigger.py index 6dbac8d1b..6454a9211 100644 --- a/modyn/supervisor/internal/triggers/datadrifttrigger.py +++ b/modyn/supervisor/internal/triggers/datadrifttrigger.py @@ -298,8 +298,8 @@ def _run_detection( drift_results[metric_name].is_drift = self.decision_policies[metric_name].evaluate_decision( metric_result.distance ) - - logger.info(f"[DataDriftDetector][Dataset {self.dataloader_info.dataset_id}]" + f"[Result] {drift_results}") + _logmsg = f"[DataDriftDetector][Dataset {self.dataloader_info.dataset_id}]" + f"[Result] {drift_results}" + logger.info(_logmsg) if is_warmup: return False, {} diff --git a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py index 1be7ea675..0dffb3a1d 100644 --- a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py +++ b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py @@ -485,15 +485,7 @@ def test__run_evaluation_retain_metrics_before_real_exception(test_connect_to_st get_result_req = EvaluationResultRequest(evaluation_id=evaluation_id) get_result_resp = evaluator.get_evaluation_result(get_result_req, None) - assert get_result_resp.valid - # evaluation on the last interval was not finished - assert len(get_result_resp.evaluation_results) == len(intervals) - 1 - assert get_result_resp.evaluation_results[0].interval_index == 0 - assert len(get_result_resp.evaluation_results[0].evaluation_data) == 2 - assert get_result_resp.evaluation_results[0].evaluation_data[0].metric == "Accuracy" - assert get_result_resp.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5) - assert get_result_resp.evaluation_results[0].evaluation_data[1].metric == "F1Score" - assert get_result_resp.evaluation_results[0].evaluation_data[1].result == pytest.approx(0.6) + assert not get_result_resp.valid @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @@ -535,12 +527,7 @@ def test_get_evaluation_result_incomplete_metric(test_is_alive, test_connect_to_ metric_result_queue = evaluation_process_info.metric_result_queue metric_result_queue.put((1, [("Accuracy", 0.5)])) response = evaluator.get_evaluation_result(EvaluationResultRequest(evaluation_id=3), None) - assert response.valid - assert len(response.evaluation_results) == 1 - assert response.evaluation_results[0].interval_index == 1 - assert len(response.evaluation_results[0].evaluation_data) == 1 - assert response.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5) - assert response.evaluation_results[0].evaluation_data[0].metric == "Accuracy" + assert not response.valid @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) diff --git a/modyn/tests/evaluator/internal/test_pytorch_evaluator.py b/modyn/tests/evaluator/internal/test_pytorch_evaluator.py index 211137d0f..11d60adfb 100644 --- a/modyn/tests/evaluator/internal/test_pytorch_evaluator.py +++ b/modyn/tests/evaluator/internal/test_pytorch_evaluator.py @@ -170,7 +170,6 @@ def test_evaluator_init(load_state_mock: MagicMock) -> None: ) ) assert evaluator._device == "cpu" - assert evaluator._device_type == "cpu" assert not evaluator._amp load_state_mock.assert_called_once_with(pathlib.Path("trained_model.modyn")) @@ -183,7 +182,6 @@ def test_no_transform_evaluator_init(load_state_mock: MagicMock): assert isinstance(evaluator._model.model, MockModel) assert not evaluator._label_transformer_function assert evaluator._device == "cpu" - assert evaluator._device_type == "cpu" assert not evaluator._amp load_state_mock.assert_called_once_with(pathlib.Path("trained_model.modyn")) diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py index 2e082d428..9f6a9dc9d 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py @@ -299,6 +299,11 @@ def test_single_batched_evaluation_mixed( eval_req2 = dummy_eval_request() eval_req.interval_start = 64 eval_req.interval_end = 14 + + request = MagicMock() + request.metrics = ["Accuracy"] + test_prepare_evaluation_request.return_value = request + results = evaluation_executor._single_batched_evaluation( [ (eval_req.interval_start, eval_req.interval_end), @@ -317,4 +322,4 @@ def test_single_batched_evaluation_mixed( ) test_wait_for_evaluation_completion.assert_called_once() test_get_evaluation_results.assert_called_once() - test_cleanup_evaluations.assert_called_once() + assert test_cleanup_evaluations.call_count == 2 diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py index d05a39d73..a4f870ee3 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py @@ -22,6 +22,7 @@ EvaluateModelResponse, EvaluationAbortedReason, EvaluationIntervalData, + SingleMetricResult, ) from modyn.supervisor.internal.eval.strategies.abstract import EvalInterval from modyn.supervisor.internal.eval.strategies.slicing import SlicingEvalStrategy @@ -826,7 +827,12 @@ def get_eval_intervals( ], ) ] - test_get_evaluation_results.return_value = [EvaluationIntervalData() for _ in range(3)] + test_get_evaluation_results.return_value = [ + EvaluationIntervalData( + interval_index=idx, evaluation_data=[SingleMetricResult(metric="Accuracy", result=0.5)] + ) + for idx in [0, 2] + ] else: intervals = [ @@ -851,7 +857,12 @@ def get_eval_intervals( evaluation_id=42, interval_responses=[success_interval for _ in range(len(intervals))], ) - test_get_evaluation_results.return_value = [EvaluationIntervalData() for _ in range(len(intervals))] + test_get_evaluation_results.return_value = [ + EvaluationIntervalData( + interval_index=idx, evaluation_data=[SingleMetricResult(metric="Accuracy", result=0.5)] + ) + for idx in range(len(intervals)) + ] pe.grpc.evaluator = evaluator_stub_mock @@ -869,7 +880,7 @@ def get_eval_intervals( assert evaluator_stub_mock.evaluate_model.call_count == 1 # batched if test_failure: - assert test_cleanup_evaluations.call_count == 1 + assert test_cleanup_evaluations.call_count == 2 assert test_wait_for_evaluation_completion.call_count == 1 stage_info = [ diff --git a/modyn/tests/supervisor/internal/test_grpc_handler.py b/modyn/tests/supervisor/internal/test_grpc_handler.py index 29dfadf61..6419e4e41 100644 --- a/modyn/tests/supervisor/internal/test_grpc_handler.py +++ b/modyn/tests/supervisor/internal/test_grpc_handler.py @@ -368,30 +368,22 @@ def test_wait_for_evaluation_completion(*args): assert handler.evaluator is not None with patch.object(handler.evaluator, "get_evaluation_status") as status_method: - status_method.side_effect = [ - EvaluationStatusResponse(valid=True, is_running=True), - EvaluationStatusResponse(valid=False), - EvaluationStatusResponse(valid=True, is_running=False), - ] - with pytest.raises(RuntimeError): - handler.wait_for_evaluation_completion(10) - assert status_method.call_count == 2 + with patch.object(handler.evaluator, "cleanup_evaluations") as _: + status_method.side_effect = [ + EvaluationStatusResponse(valid=True, is_running=True), + EvaluationStatusResponse(valid=True, is_running=False), + ] - status_method.reset_mock() - status_method.side_effect = [ - EvaluationStatusResponse(valid=True, exception="Some error"), - EvaluationStatusResponse(valid=True, is_running=False), - ] - assert not handler.wait_for_evaluation_completion(10) - assert status_method.call_count == 1 + assert handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 2 - status_method.reset_mock() - status_method.side_effect = [ - EvaluationStatusResponse(valid=True, is_running=True), - EvaluationStatusResponse(valid=True, is_running=False), - ] - assert handler.wait_for_evaluation_completion(10) - assert status_method.call_count == 2 + status_method.reset_mock() + status_method.side_effect = [ + EvaluationStatusResponse(valid=True, exception="Some error"), + EvaluationStatusResponse(valid=True, is_running=False), + ] + assert not handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 1 @patch("modyn.supervisor.internal.grpc_handler.grpc_connection_established", return_value=True) From b63795fedaddb4d1910565d38d7debd0c0492cf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= <2116466+MaxiBoether@users.noreply.github.com> Date: Wed, 16 Oct 2024 16:19:40 +0200 Subject: [PATCH 4/5] SIGMOD Revision Experiments (#610) --- .gitignore | 4 + .../pages/plots/cost_vs_eval_metric_agg.py | 8 +- analytics/app/pages/plots/eval_heatmap.py | 7 +- analytics/app/pages/plots/eval_over_time.py | 7 +- analytics/app/pages/plots/num_samples.py | 10 +- .../pages/plots/num_triggers_eval_metric.py | 6 +- .../sigmod/arxiv_scatter_triggering.ipynb | 526 ++++++++++++++++++ analytics/plotting/sigmod/yb_heatmap.ipynb | 34 +- .../sigmod/yb_scatter_selection.ipynb | 55 +- .../sigmod/yb_scatter_triggering.ipynb | 131 +++-- .../tools/aggregate_runs/core_aggregation.py | 2 +- .../triggering/arxiv_triggering_config.py | 136 +++++ .../sigmod/triggering/run_arxiv_triggering.py | 271 +++++++++ .../sigmod/triggering/run_yb_triggering.py | 161 +++++- .../triggering/yearbook_triggering_config.py | 112 ++-- .../schema/pipeline/evaluation/metrics.py | 8 + .../trigger/performance/performance.py | 2 +- .../metadata_database_connection.py | 2 +- 18 files changed, 1334 insertions(+), 148 deletions(-) create mode 100644 analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb create mode 100644 benchmark/sigmod/triggering/arxiv_triggering_config.py create mode 100644 benchmark/sigmod/triggering/run_arxiv_triggering.py diff --git a/.gitignore b/.gitignore index 5833ad5d8..92f26d4e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +# Images +*.svg +*.png + # Logging files *.log diff --git a/analytics/app/pages/plots/cost_vs_eval_metric_agg.py b/analytics/app/pages/plots/cost_vs_eval_metric_agg.py index 1abb78fc6..5850b34df 100644 --- a/analytics/app/pages/plots/cost_vs_eval_metric_agg.py +++ b/analytics/app/pages/plots/cost_vs_eval_metric_agg.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import get_args +from typing import Any, get_args import pandas as pd import plotly.express as px @@ -32,9 +32,9 @@ class _PageState: def gen_fig_scatter_num_triggers( page: str, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | Any | None, + dataset_id: str | Any | None, + metric: str | Any | None, agg_func_x: AGGREGATION_FUNCTION, agg_func_y: EVAL_AGGREGATION_FUNCTION, stages: list[str], diff --git a/analytics/app/pages/plots/eval_heatmap.py b/analytics/app/pages/plots/eval_heatmap.py index 7b9c63635..85676623d 100644 --- a/analytics/app/pages/plots/eval_heatmap.py +++ b/analytics/app/pages/plots/eval_heatmap.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Any import pandas as pd from dash import Input, Output, callback, dcc, html @@ -31,9 +32,9 @@ def gen_figure( page: str, multi_pipeline_mode: bool, patch_yearbook: bool, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | Any | None, + dataset_id: str | Any | None, + metric: str | Any | None, ) -> go.Figure: """Create the cost over time figure with barplot or histogram. Histogram has nice binning while barplot is precise. diff --git a/analytics/app/pages/plots/eval_over_time.py b/analytics/app/pages/plots/eval_over_time.py index a0d422b7a..dd57cec08 100644 --- a/analytics/app/pages/plots/eval_over_time.py +++ b/analytics/app/pages/plots/eval_over_time.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Any import pandas as pd import plotly.express as px @@ -29,9 +30,9 @@ def gen_figure( page: str, multi_pipeline_mode: bool, patch_yearbook: bool, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | Any | None, + dataset_id: str | Any | None, + metric: str | Any | None, ) -> go.Figure: """Create the evaluation over time figure with a line plot. diff --git a/analytics/app/pages/plots/num_samples.py b/analytics/app/pages/plots/num_samples.py index b33ad55be..ef8b46a5d 100644 --- a/analytics/app/pages/plots/num_samples.py +++ b/analytics/app/pages/plots/num_samples.py @@ -34,12 +34,12 @@ class _PageState: def gen_figure( page: str, multi_pipeline_mode: bool, - time_metric: str, - y_axis: YAxis, - use_scatter_size: bool, + time_metric: str | None, + y_axis: YAxis | None, + use_scatter_size: bool | None, patch_yearbook: bool, - dataset_id: str, - eval_handler: str, + dataset_id: str | None, + eval_handler: str | None, ) -> go.Figure: """Create the cost over time figure with barplot or histogram. Histogram has nice binning while barplot is precise. diff --git a/analytics/app/pages/plots/num_triggers_eval_metric.py b/analytics/app/pages/plots/num_triggers_eval_metric.py index ef685e2f1..4bf391b93 100644 --- a/analytics/app/pages/plots/num_triggers_eval_metric.py +++ b/analytics/app/pages/plots/num_triggers_eval_metric.py @@ -31,9 +31,9 @@ class _PageState: def gen_fig_scatter_num_triggers( page: str, multi_pipeline_mode: bool, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | None, + dataset_id: str | None, + metric: str | None, aggregate_metric: bool = True, time_weighted: bool = True, only_active_periods: bool = True, diff --git a/analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb b/analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb new file mode 100644 index 000000000..c4911df62 --- /dev/null +++ b/analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb @@ -0,0 +1,526 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections import defaultdict\n", + "from pathlib import Path\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "\n", + "from analytics.app.data.load import list_pipelines\n", + "from analytics.app.data.transform import (\n", + " df_aggregate_eval_metric,\n", + " dfs_models_and_evals,\n", + " logs_dataframe,\n", + ")\n", + "from analytics.plotting.common.common import init_plot\n", + "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# INPUTS\n", + "\n", + "pipelines_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/arxiv/revision/final_arxiv_revision_logs_agg\")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/dynamic_datasets_dsl/analytics/plotting/sigmod\")\n", + "assert pipelines_dir.exists()\n", + "assert output_dir.exists()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipelines = list_pipelines(pipelines_dir)\n", + "max_pipeline_id = max(pipelines.keys())\n", + "pipelines" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from analytics.app.data.load import load_pipeline_logs\n", + "\n", + "pipeline_logs = {p_id: load_pipeline_logs(p_id, pipelines_dir) for (p_id, (_, p_path)) in pipelines.items()}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# mode:\n", + "# single pipeline\n", + "pipeline_ids = (\n", + " [p_id for p_id, (p, _) in pipelines.items() if \"timetrigger\" in p and (\"_1y\" in p or \"_3y\" in p or \"_5y\" in p)]\n", + " + [p_id for p_id, (p, _) in pipelines.items() if \"amount\" in p and (\"30000\" in p or \"50000\" in p)]\n", + " + [\n", + " # drift\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"kaggle_arxiv_mmdalibi_5000_0.005_1y\",\n", + " \"kaggle_arxiv_mmdalibi_5000_0.01_1y\",\n", + " \"kaggle_arxiv_mmdalibi_5000_0.02_1y\",\n", + " \"kaggle_arxiv_mmdalibi_dyn_5000_15_qt_0.05_1y\",\n", + " }\n", + " ]\n", + " + [\n", + " # perf\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"kaggle_arxiv_perf_5000_0.7\",\n", + " \"kaggle_arxiv_perf_5000_0.75\",\n", + " }\n", + " ]\n", + ")\n", + "composite_model_variant = \"currently_active_model\" # currently_trained_model\n", + "patch_yearbook = False\n", + "dataset_id = \"arxiv_kaggle_test\"\n", + "eval_handler = \"exactmatrix\"\n", + "metric = \"Top-2-Accuracy\"\n", + "include_composite_model = False\n", + "\n", + "\n", + "def pipeline_name_mapper(name: str) -> str:\n", + " name = name.replace(\"kaggle_arxiv_\", \"\")\n", + "\n", + " if \"amounttrigger\" in name:\n", + " name = name.replace(\"amounttrigger_\", \"\")\n", + " name = name + \" samples\"\n", + " elif \"timetrigger\" in name:\n", + " name = name.replace(\"timetrigger_\", \"\")\n", + " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", + " elif \"perf\" in name:\n", + " name = name.replace(\"perf_\", \"\")\n", + " name = \"< \" + name.replace(\"5000_\", \"\")\n", + " elif \"dyn\" in name:\n", + " name = \"AutoDrift\"\n", + " else:\n", + " name = name.replace(\"mmdalibi_\", \"\")\n", + " name = name.replace(\"_\", \"/\")\n", + "\n", + " return name\n", + "\n", + "\n", + "pipelines = {p_id: (pipeline_name_mapper(pname), p_path) for p_id, (pname, p_path) in pipelines.items()}\n", + "\n", + "unified_pids = []\n", + "names = set()\n", + "for p_id, (pname, _) in pipelines.items():\n", + " if p_id in pipeline_ids and pname not in names:\n", + " unified_pids.append(p_id)\n", + " names.add(pname)\n", + "pipeline_ids = unified_pids\n", + "\n", + "\n", + "[(p_id, pname) for p_id, (pname, _) in pipelines.items() if p_id in pipeline_ids]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Wrangle data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "list_df_eval_single: list[pd.DataFrame] = []\n", + "list_df_all: list[pd.DataFrame] = []\n", + "\n", + "for pipeline_id in pipeline_ids:\n", + " df_all = logs_dataframe(pipeline_logs[pipeline_id], pipelines[pipeline_id][0])\n", + " list_df_all.append(df_all)\n", + "\n", + " _, _, df_eval_single = dfs_models_and_evals(\n", + " pipeline_logs[pipeline_id], df_all[\"sample_time\"].max(), pipelines[pipeline_id][0]\n", + " )\n", + " list_df_eval_single.append(df_eval_single)\n", + "\n", + "df_adjusted = pd.concat(list_df_eval_single)\n", + "df_adjusted\n", + "\n", + "df_all = pd.concat(list_df_all)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted = df_adjusted[\n", + " (df_adjusted[\"dataset_id\"] == dataset_id)\n", + " & (df_adjusted[\"eval_handler\"] == eval_handler)\n", + " & (df_adjusted[\"metric\"] == metric)\n", + "]\n", + "\n", + "# in percent (0-100)\n", + "df_adjusted[\"value\"] = df_adjusted[\"value\"] * 100\n", + "df_adjusted" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted = df_adjusted.sort_values(by=[\"interval_center\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Reduce to composite models\n", + "df_adjusted = df_adjusted[df_adjusted[composite_model_variant]]\n", + "df_adjusted[composite_model_variant].unique()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dump Data backup" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create Plot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# reduce evaluation interval to interval where all policies have evaluations\n", + "min_active_eval_center_per_pipeline = (\n", + " df_adjusted[df_adjusted[composite_model_variant]].groupby(\"pipeline_ref\")[\"interval_center\"].min()\n", + ")\n", + "maximum_min = min_active_eval_center_per_pipeline.max()\n", + "print(maximum_min, min_active_eval_center_per_pipeline)\n", + "\n", + "df_adjusted = df_adjusted[df_adjusted[\"interval_center\"] >= maximum_min]\n", + "df_adjusted[\"interval_center\"].unique()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted[\"interval_center\"] = df_adjusted[\"interval_center\"].astype(str).str.split(\"-\").str[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Aggregate metrics to a scalar value per pipeline\n", + "mean_accuracies = df_aggregate_eval_metric(\n", + " df_adjusted,\n", + " group_by=[\"pipeline_ref\", \"metric\"],\n", + " in_col=\"value\",\n", + " out_col=\"metric_value\",\n", + " aggregate_func=\"mean\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_triggers = df_all[df_all[\"id\"] == PipelineStage.HANDLE_SINGLE_TRIGGER.name]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_triggers = df_all[df_all[\"id\"] == PipelineStage.HANDLE_SINGLE_TRIGGER.name]\n", + "df_triggers = df_triggers[df_triggers[\"sample_time\"] > maximum_min]\n", + "df_triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Find number of trigger per pipeline that are after maximum_min\n", + "\n", + "# before the cutoff there was one trigger (equivalent to start of our reduced dataset): +1\n", + "num_triggers = df_triggers.groupby(\"pipeline_ref\").aggregate(count=(\"id\", \"count\")) + 1\n", + "num_triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "merged = num_triggers.merge(mean_accuracies, on=\"pipeline_ref\")\n", + "assert mean_accuracies.shape[0] == merged.shape[0]\n", + "merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_type(x: str):\n", + " if \"year\" in x:\n", + " return \"yearly\"\n", + " elif \"samples\" in x:\n", + " return \"amount\"\n", + " elif \"y\" in x:\n", + " return \"drift\"\n", + " elif \"<\" in x:\n", + " return \"perf\"\n", + " elif \"AutoDrift\" in x:\n", + " return \"drift\"\n", + " else:\n", + " return \"unknown\"\n", + "\n", + "\n", + "merged[\"type\"] = merged[\"pipeline_ref\"].apply(lambda x: create_type(x))\n", + "merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette = sns.color_palette(\"RdBu\", 10)\n", + "palette" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2 = sns.color_palette(\"colorblind\", 10)\n", + "palette2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the heatmap\n", + "init_plot()\n", + "# sns.set_theme(style=\"ticks\")\n", + "# plt.rcParams['svg.fonttype'] = 'none'\n", + "sns.set_style(\"whitegrid\")\n", + "\n", + "FONTSIZE = 20\n", + "DOUBLE_FIG_WIDTH = 10\n", + "DOUBLE_FIG_HEIGHT = 3.5\n", + "DOUBLE_FIG_SIZE = (DOUBLE_FIG_WIDTH, 1.5 * DOUBLE_FIG_HEIGHT)\n", + "\n", + "fig = plt.figure(\n", + " edgecolor=\"black\",\n", + " frameon=True,\n", + " figsize=DOUBLE_FIG_SIZE,\n", + " dpi=300,\n", + ")\n", + "\n", + "markers = {\"drift\": \"X\", \"yearly\": \"o\", \"amount\": \"D\", \"perf\": \"*\"}\n", + "\n", + "ax = sns.scatterplot(\n", + " merged,\n", + " x=\"count\",\n", + " y=\"metric_value\",\n", + " hue=\"type\",\n", + " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1], \"perf\": palette2[4]},\n", + " s=200,\n", + " legend=True,\n", + " markers=markers,\n", + " style=\"type\", # required for markers\n", + " edgecolor=\"none\",\n", + " # annotations\n", + ")\n", + "ax.set(ylim=(55, 80))\n", + "ax.set(xlim=(-4, 170))\n", + "\n", + "for i in range(merged.shape[0]):\n", + " offsets = defaultdict(lambda: (+1.5, -0.25))\n", + " offsets.update(\n", + " {\n", + " # x, y\n", + " \"3 years\": (+2, -1),\n", + " \"1 year\": (-3.5, +1),\n", + " \"5 years\": (+2, -0.15),\n", + " \"50000 samples\": (+3, -1),\n", + " \"30000 samples\": (-3, +1),\n", + " \"5000/0.005/1y\": (-9, -2),\n", + " \"5000/0.01/1y\": (+2.5, -0.5),\n", + " \"5000/0.02/1y\": (+2.5, -0.5),\n", + " \"< 0.75\": (-18, -0.5),\n", + " \"< 0.7\": (-3.5, +1),\n", + " \"AutoDrift\": (+1.5, -1.5),\n", + " }\n", + " )\n", + " plt.rc(\"text\", usetex=True)\n", + "\n", + " def fix_s(ref: str) -> str:\n", + " if ref[0] != \"<\":\n", + " return r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\"\n", + "\n", + " return r\"$\\mathbf{<}$ \\textbf{\" + ref[2:] + \"}\"\n", + "\n", + " plt.text(\n", + " x=merged[\"count\"][i] + offsets[merged[\"pipeline_ref\"][i]][0],\n", + " y=merged[\"metric_value\"][i] + offsets[merged[\"pipeline_ref\"][i]][1],\n", + " s=fix_s(merged[\"pipeline_ref\"][i]),\n", + " fontdict=dict(color=\"black\", fontsize=17),\n", + " )\n", + " plt.rc(\"text\", usetex=False)\n", + "\n", + "\n", + "# Adjust x-axis tick labels\n", + "plt.xlabel(\"Number of triggers\", labelpad=10)\n", + "plt.xticks(\n", + " ticks=[x for x in range(0, 170 + 1, 20)],\n", + " labels=[x for x in range(0, 170 + 1, 20)],\n", + " rotation=0,\n", + " # ha='right'\n", + ")\n", + "\n", + "# Set y-axis ticks to be equally spaced\n", + "plt.ylabel(\"Mean Top-2 Accuracy %\", labelpad=15)\n", + "plt.yticks(\n", + " ticks=[x for x in range(56, 80 + 1, 3)],\n", + " labels=[x for x in range(56, 80 + 1, 3)],\n", + " rotation=0,\n", + ")\n", + "\n", + "\n", + "label_mapping = {\"drift\": \"Drift\", \"yearly\": \"Time\", \"amount\": \"Amount\", \"perf\": \"Performance\"}\n", + "handles, labels = ax.get_legend_handles_labels()\n", + "latex_labels = [f\"{label_mapping.get(label, label)} \" for label in labels]\n", + "\n", + "legend = ax.legend(\n", + " loc=\"lower right\",\n", + " ncol=2,\n", + " handles=handles,\n", + " labels=latex_labels,\n", + " labelspacing=0.2,\n", + " columnspacing=0.9,\n", + " handlelength=1.3,\n", + ")\n", + "\n", + "\n", + "# Display the plot\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save Plot as svg" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for img_type in [\"png\", \"svg\"]:\n", + " img_path = output_dir / f\"scatter_arxiv.{img_type}\"\n", + " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2[4]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/analytics/plotting/sigmod/yb_heatmap.ipynb b/analytics/plotting/sigmod/yb_heatmap.ipynb index 05e29ce5d..eb42b214c 100644 --- a/analytics/plotting/sigmod/yb_heatmap.ipynb +++ b/analytics/plotting/sigmod/yb_heatmap.ipynb @@ -27,14 +27,14 @@ "source": [ "# INPUTS\n", "\n", - "drift_pipeline = True\n", + "drift_pipeline = False\n", "if drift_pipeline:\n", - " pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/triggering/logs_agg\")\n", - "else:\n", " pipelines_dir = Path(\n", - " \"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/data_selection_50%/logs_agg_patch\"\n", + " \"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/triggering_revision/logs_revision_fullrerun_agg\"\n", " )\n", - "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "else:\n", + " pipelines_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/data_selection_50%/logs_agg_patch\")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data\")\n", "assert pipelines_dir.exists()\n", "assert output_dir.exists()" ] @@ -67,7 +67,7 @@ "metadata": {}, "outputs": [], "source": [ - "type(pipeline_logs[5 if not drift_pipeline else 38])" + "type(pipeline_logs[5 if not drift_pipeline else 13])" ] }, { @@ -77,7 +77,7 @@ "outputs": [], "source": [ "# mode:\n", - "pipeline_id = 5 if not drift_pipeline else 38\n", + "pipeline_id = 5 if not drift_pipeline else 13\n", "\n", "# doesn't do anything unless include_composite_model = True\n", "composite_model_variant = \"currently_trained_model\" if not drift_pipeline else \"currently_active_model\"\n", @@ -258,9 +258,9 @@ "outputs": [], "source": [ "# Create the heatmap\n", - "from analytics.plotting.common.common import INIT_PLOT\n", + "from analytics.plotting.common.common import init_plot\n", "\n", - "INIT_PLOT()\n", + "init_plot()\n", "# sns.set_theme(style=\"ticks\")\n", "plt.rcParams[\"svg.fonttype\"] = \"none\"\n", "\n", @@ -315,11 +315,11 @@ " rotation=0,\n", " # ha='right'\n", " )\n", - "plt.ylabel(\"Trained up to\")\n", + "plt.ylabel(\"Model trained on data up to\")\n", "\n", "# Draft training boxes\n", "if drift_pipeline:\n", - " for type_, dashed in [(\"train\", False), (\"usage\", False), (\"train\", True)]:\n", + " for type_, dashed in [(\"usage\", False)]: # [(\"train\", False), (\"usage\", False), (\"train\", True)]:\n", " for active_ in df_logs_models.iterrows():\n", " x_start = active_[1][f\"{type_}_start\"].year - 1930\n", " x_end = active_[1][f\"{type_}_end\"].year - 1930\n", @@ -360,13 +360,21 @@ "source": [ "for img_type in [\"png\", \"svg\"]:\n", " img_path = output_dir / f\"yearbook_heatmap{'_trigger' if drift_pipeline else ''}.{img_type}\"\n", + " print(img_path)\n", " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "modyn", "language": "python", "name": "python3" }, @@ -380,7 +388,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/analytics/plotting/sigmod/yb_scatter_selection.ipynb b/analytics/plotting/sigmod/yb_scatter_selection.ipynb index 246c78db5..f02089513 100644 --- a/analytics/plotting/sigmod/yb_scatter_selection.ipynb +++ b/analytics/plotting/sigmod/yb_scatter_selection.ipynb @@ -33,8 +33,8 @@ "source": [ "# INPUTS\n", "\n", - "pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/data_selection_50%/logs_agg_patch\")\n", - "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "pipelines_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/data_selection_50%/logs_agg_patch\")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/dynamic_datasets_dsl\")\n", "assert pipelines_dir.exists()\n", "assert output_dir.exists()" ] @@ -278,11 +278,13 @@ "outputs": [], "source": [ "# Create the heatmap\n", - "from analytics.plotting.common.common import INIT_PLOT\n", + "from analytics.plotting.common.common import init_plot\n", "\n", - "INIT_PLOT()\n", + "plt.rcParams[\"svg.fonttype\"] = \"none\"\n", + "plt.rcParams[\"text.usetex\"] = False\n", + "\n", + "init_plot()\n", "# sns.set_theme(style=\"ticks\")\n", - "# plt.rcParams['svg.fonttype'] = 'none'\n", "sns.set_style(\"whitegrid\")\n", "\n", "FONTSIZE = 20\n", @@ -297,6 +299,20 @@ " dpi=300,\n", ")\n", "\n", + "label_map = {\n", + " \"Loss \": \"Loss \\\\y{a} \",\n", + " \"DLIS \": \"DLIS \\\\y{a} \",\n", + " \"Uniform \": \"Uniform \",\n", + " \"Class-Bal. \": \"Class-Bal. \",\n", + " \"RS2 \": \"RS2 \\\\y{b} \",\n", + " \"RS2 (w/o) \": \"RS2 (w/o) \\\\y{b} \",\n", + " \"Margin \": \"Margin \\\\y{c} \",\n", + " \"Least conf. \": \"Least conf. \\\\y{c} \",\n", + " \"Entropy \": \"Entropy \\\\y{c} \",\n", + "}\n", + "\n", + "mean_accuracies_candidate[\"pipeline_ref_mapped\"] = mean_accuracies_candidate[\"pipeline_ref\"].map(label_map)\n", + "\n", "palette = sns.color_palette(\"RdBu_r\", 10)\n", "palette = [palette[1], palette[1]]\n", "ax = sns.stripplot(\n", @@ -321,7 +337,7 @@ " linewidth=3,\n", ")\n", "\n", - "plt.text(s=\"Full data training\", x=-0.2, y=mean_accuracy_ref[\"metric_value\"].values[0] - 2, color=\"dimgrey\")\n", + "plt.text(s=\"Full data training\", x=-0.2, y=mean_accuracy_ref[\"metric_value\"].values[0] - 1, color=\"dimgrey\")\n", "\n", "\n", "# Set x-axis\n", @@ -336,8 +352,19 @@ " rotation=0,\n", ")\n", "\n", + "\n", + "a = ax.get_xticklabels()\n", + "n = []\n", + "for lbl in a:\n", + " print(lbl)\n", + " lbl.set_text(label_map[lbl.get_text()])\n", + " n.append(lbl)\n", + "ax.set_xticklabels(a)\n", + "\n", + "\n", "# Display the plot\n", - "plt.tight_layout()\n", + "# plt.tight_layout()\n", + "\n", "plt.show()" ] }, @@ -356,13 +383,21 @@ "source": [ "for img_type in [\"png\", \"svg\"]:\n", " img_path = output_dir / f\"scatter_selection_yb.{img_type}\"\n", - " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" + " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)\n", + " print(output_dir / f\"scatter_selection_yb.{img_type}\")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "modyn", "language": "python", "name": "python3" }, @@ -376,7 +411,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/analytics/plotting/sigmod/yb_scatter_triggering.ipynb b/analytics/plotting/sigmod/yb_scatter_triggering.ipynb index 192516a2f..75fcef42d 100644 --- a/analytics/plotting/sigmod/yb_scatter_triggering.ipynb +++ b/analytics/plotting/sigmod/yb_scatter_triggering.ipynb @@ -6,6 +6,7 @@ "metadata": {}, "outputs": [], "source": [ + "from collections import defaultdict\n", "from pathlib import Path\n", "\n", "import matplotlib.pyplot as plt\n", @@ -19,6 +20,7 @@ " logs_dataframe,\n", " patch_yearbook_time,\n", ")\n", + "from analytics.plotting.common.common import init_plot\n", "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", "\n", "%load_ext autoreload\n", @@ -33,8 +35,10 @@ "source": [ "# INPUTS\n", "\n", - "pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/triggering/logs_agg\")\n", - "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "pipelines_dir = Path(\n", + " \"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/triggering_revision/logs_revision_fullrerun_agg\"\n", + ")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/dynamic_datasets_dsl/analytics/plotting/sigmod\")\n", "assert pipelines_dir.exists()\n", "assert output_dir.exists()" ] @@ -67,7 +71,7 @@ "metadata": {}, "outputs": [], "source": [ - "type(pipeline_logs[32])" + "type(pipeline_logs[13])" ] }, { @@ -97,12 +101,24 @@ " for p_id, (p, _) in pipelines.items()\n", " if p\n", " in {\n", - " \"yearbook_mmdalibi_250_0.05_5d\",\n", - " \"yearbook_mmdalibi_250_0.07_1d\",\n", - " \"yearbook_mmdalibi_250_0.07_5d\",\n", - " \"yearbook_mmdalibi_250_0.05_1d\",\n", - " \"yearbook_mmdalibi_500_0.05_1d\",\n", - " \"yearbook_mmdalibi_100_0.05_1d\",\n", + " \"yearbook_mmdalibi_250_0.05_5d\", # ok\n", + " \"yearbook_mmdalibi_100_0.05_1d\", # ok\n", + " \"yearbook_mmdalibi_250_0.07_5d\", # ok\n", + " \"yearbook_mmdalibi_250_0.07_1d\", # n\n", + " \"yearbook_mmdalibi_250_0.05_1d\", # n\n", + " \"yearbook_mmdalibi_dyn_250_15_qt_0.05_5d\",\n", + " }\n", + " ]\n", + " + [\n", + " # perf\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"yearbook_perf_250_0.95\",\n", + " \"yearbook_perf_250_0.9\",\n", + " \"yearbook_perf_250_0.85\",\n", + " \"yearbook_perf_250_0.8\",\n", " }\n", " ]\n", ")\n", @@ -116,15 +132,22 @@ "\n", "def pipeline_name_mapper(name: str) -> str:\n", " name = name.replace(\"yearbook_\", \"\")\n", - " name = name.replace(\"timetrigger_\", \"\") # \"every \"\n", - " name = name.replace(\"amounttrigger_\", \"\") # \"every \"\n", - " name = name.replace(\"mmdalibi_\", \"\")\n", - " if name.endswith(\"y\"):\n", - " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", - " elif not name.endswith(\"d\"): # dataamount\n", + "\n", + " if \"amounttrigger\" in name:\n", + " name = name.replace(\"amounttrigger_\", \"\")\n", " name = name + \" samples\"\n", - " else: # drift\n", + " elif \"timetrigger\" in name:\n", + " name = name.replace(\"timetrigger_\", \"\")\n", + " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", + " elif \"perf\" in name:\n", + " name = name.replace(\"perf_\", \"\")\n", + " name = \"< \" + name.replace(\"250_\", \"\")\n", + " elif \"dyn\" in name:\n", + " name = \"AutoDrift\"\n", + " else:\n", + " name = name.replace(\"mmdalibi_\", \"\")\n", " name = name.replace(\"_\", \"/\")\n", + "\n", " return name\n", "\n", "\n", @@ -326,6 +349,10 @@ " return \"amount\"\n", " elif \"d\" in x:\n", " return \"drift\"\n", + " elif \"<\" in x:\n", + " return \"perf\"\n", + " elif \"AutoDrift\" in x:\n", + " return \"drift\"\n", " else:\n", " return \"unknown\"\n", "\n", @@ -361,11 +388,7 @@ "outputs": [], "source": [ "# Create the heatmap\n", - "from collections import defaultdict\n", - "\n", - "from analytics.plotting.common.common import INIT_PLOT\n", - "\n", - "INIT_PLOT()\n", + "init_plot()\n", "# sns.set_theme(style=\"ticks\")\n", "# plt.rcParams['svg.fonttype'] = 'none'\n", "sns.set_style(\"whitegrid\")\n", @@ -382,15 +405,19 @@ " dpi=300,\n", ")\n", "\n", + "markers = {\"drift\": \"X\", \"yearly\": \"o\", \"amount\": \"D\", \"perf\": \"*\"}\n", + "\n", "ax = sns.scatterplot(\n", " merged,\n", " x=\"count\",\n", " y=\"metric_value\",\n", " hue=\"type\",\n", - " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1]},\n", + " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1], \"perf\": palette2[4]},\n", " s=200,\n", - " legend=False,\n", - " marker=\"X\",\n", + " legend=True,\n", + " markers=markers,\n", + " style=\"type\", # required for markers\n", + " edgecolor=\"none\",\n", " # annotations\n", ")\n", "ax.set(ylim=(85, 94.5))\n", @@ -403,21 +430,33 @@ " # x, y\n", " \"3 years\": (-3, +0.5),\n", " \"1 year\": (-2, -0.85),\n", - " \"5 years\": (-3, +0.5),\n", + " \"5 years\": (+1.8, -0.15),\n", " \"500 samples\": (-5, +0.5),\n", - " \"2000 samples\": (+1.7, -0.25),\n", - " \"250/0.05/5d\": (-2, +0.5),\n", - " \"100/0.05/1d\": (+1.5, -0.7),\n", - " \"500/0.05/1d\": (+1.5, 0.15),\n", - " \"250/0.07/1d\": (+1.5, -0.55),\n", - " \"250/0.05/1d\": (-10, +0.4),\n", + " \"2000 samples\": (-16, -0.25),\n", + " \"250/0.05/5d\": (+2, -0.2),\n", + " \"100/0.05/1d\": (+1.2, +0.25),\n", + " # \"500/0.05/1d\": (+1.5, 0.15),\n", + " \"250/0.07/1d\": (+2.4, -0.1),\n", + " \"250/0.05/1d\": (-13, -0.7),\n", + " \"< 0.95\": (-4.5, -0.8),\n", + " \"< 0.9\": (-2.5, +0.5),\n", + " \"< 0.85\": (-3, -0.75),\n", + " \"< 0.8\": (-3, +0.3),\n", + " \"AutoDrift\": (-5, +0.4),\n", " }\n", " )\n", " plt.rc(\"text\", usetex=True)\n", + "\n", + " def fix_s(ref: str) -> str:\n", + " if ref[0] != \"<\":\n", + " return r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\"\n", + "\n", + " return r\"$\\mathbf{<}$ \\textbf{\" + ref[2:] + \"}\"\n", + "\n", " plt.text(\n", " x=merged[\"count\"][i] + offsets[merged[\"pipeline_ref\"][i]][0],\n", " y=merged[\"metric_value\"][i] + offsets[merged[\"pipeline_ref\"][i]][1],\n", - " s=r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\",\n", + " s=fix_s(merged[\"pipeline_ref\"][i]),\n", " fontdict=dict(color=\"black\", fontsize=17),\n", " )\n", " plt.rc(\"text\", usetex=False)\n", @@ -440,6 +479,21 @@ " rotation=0,\n", ")\n", "\n", + "label_mapping = {\"drift\": \"Drift\", \"yearly\": \"Time\", \"amount\": \"Amount\", \"perf\": \"Performance\"}\n", + "handles, labels = ax.get_legend_handles_labels()\n", + "latex_labels = [f\"{label_mapping.get(label, label)} \" for label in labels]\n", + "\n", + "legend = ax.legend(\n", + " loc=\"lower right\",\n", + " ncol=2,\n", + " handles=handles,\n", + " labels=latex_labels,\n", + " labelspacing=0.2,\n", + " columnspacing=0.9,\n", + " handlelength=1.3,\n", + ")\n", + "\n", + "\n", "# Display the plot\n", "plt.tight_layout()\n", "plt.show()" @@ -463,6 +517,15 @@ " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2[4]" + ] + }, { "cell_type": "code", "execution_count": null, @@ -473,7 +536,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "modyn", "language": "python", "name": "python3" }, @@ -487,7 +550,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/analytics/tools/aggregate_runs/core_aggregation.py b/analytics/tools/aggregate_runs/core_aggregation.py index d915790dc..cb93d05a6 100644 --- a/analytics/tools/aggregate_runs/core_aggregation.py +++ b/analytics/tools/aggregate_runs/core_aggregation.py @@ -10,7 +10,7 @@ from modyn.supervisor.internal.grpc.enums import PipelineStage from modyn.supervisor.internal.pipeline_executor.models import MultiEvaluationInfo, PipelineLogs -DEBUGGING_MODE = True +DEBUGGING_MODE = False """If True, the the process will halt on breakpoints to allow for manual verification.""" diff --git a/benchmark/sigmod/triggering/arxiv_triggering_config.py b/benchmark/sigmod/triggering/arxiv_triggering_config.py new file mode 100644 index 000000000..430b7e89f --- /dev/null +++ b/benchmark/sigmod/triggering/arxiv_triggering_config.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from modyn.config import ( + CheckpointingConfig, + OptimizationCriterion, + OptimizerConfig, + OptimizerParamGroup, +) +from modyn.config.schema.pipeline import ( + AccuracyMetricConfig, + DataConfig, + EvalDataConfig, + EvaluationConfig, + F1ScoreMetricConfig, + FullModelStrategy, + ModelConfig, + ModynPipelineConfig, + Pipeline, + PipelineModelStorageConfig, + TrainingConfig, +) +from modyn.config.schema.pipeline.evaluation.handler import EvalHandlerConfig +from modyn.config.schema.pipeline.evaluation.strategy.periodic import PeriodicEvalStrategyConfig +from modyn.config.schema.pipeline.sampling.config import NewDataStrategyConfig +from modyn.config.schema.pipeline.trigger import TriggerConfig +from modyn.utils.utils import SECONDS_PER_UNIT + + +def gen_arxiv_training_conf(gpu_device: str, seed: int): + opti_conf = OptimizerConfig( + name="default", + algorithm="AdamW", + source="PyTorch", + param_groups=[OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})], + ) + + return TrainingConfig( + gpus=1, + device=gpu_device, + dataloader_workers=1, + use_previous_model=True, + initial_model="random", + batch_size=128, + optimizers=[opti_conf], + optimization_criterion=OptimizationCriterion(name="CrossEntropyLoss"), + checkpointing=CheckpointingConfig(activated=False), + lr_scheduler=None, + epochs_per_trigger=5, + shuffle=True, + amp=False, + seed=seed, + ) + + +ARXIV_BPF = ( + "import torch\n" "def bytes_parser_function(data: memoryview) -> torch.Tensor:\n" " return str(data, 'utf8')" +) + +ARXIV_EVAL_FUNC = ( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" +) + + +def get_eval_data_config(dataset: str) -> EvalDataConfig: + return EvalDataConfig( + dataset_id=dataset, + bytes_parser_function=ARXIV_BPF, + tokenizer="DistilBertTokenizerTransform", + batch_size=256, + dataloader_workers=1, + metrics=[ + AccuracyMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + topn=1, + ), + AccuracyMetricConfig(evaluation_transformer_function="", topn=2), + AccuracyMetricConfig(evaluation_transformer_function="", topn=5), + F1ScoreMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + num_classes=172, + average="weighted", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + num_classes=172, + average="macro", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + num_classes=172, + average="micro", + ), + ], + ) + + +def gen_arxiv_triggering_config( + config_id: str, gpu_device: str, trigger_config: TriggerConfig, seed: int, start_eval_at: int +) -> ModynPipelineConfig: + return ModynPipelineConfig( + pipeline=Pipeline(name=f"kaggle_arxiv_{config_id}", description="Arxiv triggering config", version="0.0.1"), + model=ModelConfig(id="ArticleNet", config={"num_classes": 172}), + model_storage=PipelineModelStorageConfig(full_model_strategy=FullModelStrategy(name="PyTorchFullModel")), + training=gen_arxiv_training_conf(gpu_device, seed), + selection_strategy=NewDataStrategyConfig( + maximum_keys_in_memory=200000, storage_backend="database", tail_triggers=0, limit=-1 + ), + data=DataConfig( + dataset_id="arxiv_kaggle_train", + bytes_parser_function=ARXIV_BPF, + tokenizer="DistilBertTokenizerTransform", + ), + trigger=trigger_config, + evaluation=EvaluationConfig( + handlers=[ + EvalHandlerConfig( + name="exactmatrix", + execution_time="after_pipeline", + models="matrix", + datasets=["arxiv_kaggle_test"], + strategy=PeriodicEvalStrategyConfig( + every="26w", + interval="[-13w; +13w]", + start_timestamp=start_eval_at + 13 * SECONDS_PER_UNIT["w"], + end_timestamp=1724803200, + ), + ) + ], + after_pipeline_evaluation_workers=2, + after_training_evaluation_workers=2, + device=gpu_device, + datasets=[get_eval_data_config(dataset) for dataset in ["arxiv_kaggle_test"]], + ), + ) diff --git a/benchmark/sigmod/triggering/run_arxiv_triggering.py b/benchmark/sigmod/triggering/run_arxiv_triggering.py new file mode 100644 index 000000000..8f6225cca --- /dev/null +++ b/benchmark/sigmod/triggering/run_arxiv_triggering.py @@ -0,0 +1,271 @@ +from __future__ import annotations + +import logging +import math +import os +import sys +from pathlib import Path + +from benchmark.sigmod.triggering.arxiv_triggering_config import gen_arxiv_triggering_config, get_eval_data_config +from experiments.utils.experiment_runner import run_multiple_pipelines +from modyn.config.schema.pipeline import ModynPipelineConfig +from modyn.config.schema.pipeline.trigger import TriggerConfig +from modyn.config.schema.pipeline.trigger.drift import DataDriftTriggerConfig +from modyn.config.schema.pipeline.trigger.drift.alibi_detect import AlibiDetectMmdDriftMetric +from modyn.config.schema.pipeline.trigger.drift.criterion import ( + DynamicQuantileThresholdCriterion, + DynamicRollingAverageThresholdCriterion, + ThresholdDecisionCriterion, +) +from modyn.config.schema.pipeline.trigger.drift.detection_window.time_ import TimeWindowingStrategy +from modyn.config.schema.pipeline.trigger.performance.criterion import ( + DynamicQuantilePerformanceThresholdCriterion, + DynamicRollingAveragePerformanceThresholdCriterion, + StaticPerformanceThresholdCriterion, +) +from modyn.config.schema.pipeline.trigger.performance.performance import ( + PerformanceTriggerConfig, + PerformanceTriggerEvaluationConfig, +) +from modyn.config.schema.pipeline.trigger.simple.data_amount import DataAmountTriggerConfig +from modyn.config.schema.pipeline.trigger.simple.time import TimeTriggerConfig +from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs +from modyn.utils.utils import current_time_millis +from modynclient.config.schema.client_config import ModynClientConfig, Supervisor + +logging.basicConfig( + level=logging.NOTSET, + format="[%(asctime)s] [%(filename)15s:%(lineno)4d] %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d:%H:%M:%S", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler(f"client_{current_time_millis()}.log", mode="w"), + ], +) +logger = logging.getLogger(__name__) + +START_TIMESTAMP = 631152000 + + +def gen_triggering_strategies(device: str) -> list[tuple[str, TriggerConfig]]: + strategies = [] + + # TimeTriggers + for years in [1, 3, 5]: + strategies.append( + (f"timetrigger_{years}y", TimeTriggerConfig(every=f"{years}y", start_timestamp=START_TIMESTAMP)) + ) + + # DataAmountTriggers + for count in [30000, 50000]: + strategies.append((f"amounttrigger_{count}", DataAmountTriggerConfig(num_samples=count))) + + return strategies + + +def gen_revision_triggering_strategies(device: str) -> list[tuple[str, TriggerConfig]]: + strategies = [] + min_warmup_data_points = 20000 + + for evaluation_interval_data_points in [5000]: + warmup_intervals = math.ceil(min_warmup_data_points / evaluation_interval_data_points) + + # Static Drift Triggers + for threshold in [0.02, 0.01, 0.005, 0.002]: + for window_size in ["1y", "2y"]: + conf = DataDriftTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + sample_size=10000, + metrics={ + "mmd_alibi": AlibiDetectMmdDriftMetric( + device="gpu", + num_permutations=None, + decision_criterion=ThresholdDecisionCriterion(threshold=threshold), + ) + }, + warmup_policy=TimeTriggerConfig(every="1y", start_timestamp=START_TIMESTAMP), + warmup_intervals=warmup_intervals, + ) + name = f"mmdalibi_{evaluation_interval_data_points}_{threshold}_{window_size}" + strategies.append((name, conf)) + + ## Dynamic Drift Triggers + for window_size in ["1y", "2y"]: + for metric_window_size in [15, 30]: # how many drift scores we use for calibrating the dynamic policy + criteria = [] + for deviation in [1]: + criteria.append( + ( + f"roll_{deviation}", + DynamicRollingAverageThresholdCriterion( + window_size=metric_window_size, deviation=deviation, absolute=False + ), + ) + ) + for quantile in [0.05, 0.1, 0.2]: + criteria.append( + ( + f"qt_{quantile}", + DynamicQuantileThresholdCriterion(window_size=metric_window_size, quantile=quantile), + ) + ) + for dec_crit_str, decision_criterion in criteria: + conf = DataDriftTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + sample_size=10000, + metrics={ + "mmd_alibi": AlibiDetectMmdDriftMetric( + device="gpu", + num_permutations=None, + decision_criterion=decision_criterion, + ) + }, + warmup_policy=TimeTriggerConfig(every="1y", start_timestamp=START_TIMESTAMP), + warmup_intervals=warmup_intervals, + ) + + name = f"mmdalibi_dyn_{evaluation_interval_data_points}_{metric_window_size}_{dec_crit_str}_{window_size}" + strategies.append((name, conf)) + + ## Static PerformanceTriggers + for threshold in [0.8, 0.75, 0.7]: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=1, # deprecated + data_density_window_size=1, # deprecated + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("arxiv_kaggle_train") + ), + decision_criteria={ + f"static-{threshold}": StaticPerformanceThresholdCriterion( + metric="Top-2-Accuracy", metric_threshold=threshold + ) + }, + ) + name = f"perf_{evaluation_interval_data_points}_{threshold}" + strategies.append((name, conf)) + + ## Dynamic PerformanceTriggers + for performance_triggers_window_size in [15, 30]: + criteria = [] + for deviation in [0.2]: + criterion = DynamicRollingAveragePerformanceThresholdCriterion( + metric="Top-2-Accuracy", + window_size=performance_triggers_window_size, + deviation=deviation, + absolute=False, + ) + criteria.append((f"{performance_triggers_window_size}_roll_{deviation}", criterion)) + + for quantile in [0.05, 0.1, 0.2]: + criterion = DynamicQuantilePerformanceThresholdCriterion( + metric="Top-2-Accuracy", window_size=performance_triggers_window_size, quantile=quantile + ) + criteria.append((f"{performance_triggers_window_size}_perc_{quantile}", criterion)) + + for dec_crit_str, decision_criterion in criteria: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=performance_triggers_window_size, + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("arxiv_kaggle_train") + ), + decision_criteria={f"dynamic-{dec_crit_str}": decision_criterion}, + warmup_policy=TimeTriggerConfig(every="1y", start_timestamp=START_TIMESTAMP), + warmup_intervals=warmup_intervals, + ) + name = f"perf_dyn_{evaluation_interval_data_points}_{dec_crit_str}" + strategies.append((name, conf)) + + return strategies + + +def run_experiment() -> None: + logger.info("Grüeziwohl!") + pipeline_configs: list[ModynPipelineConfig] = [] + train_gpu = "cuda:0" + num_gpus = 1 # to parallelize across gpus + gpu_id = 0 + seeds = [42, 99, 12] # set to [None] to disable, should be 0-100 + skip_existing = True + + existing_pipelines = [] + if skip_existing: + log_directory = Path(input("Please enter the directory in which to search for existing pipelines: ")) or Path( + "/raid/modyn/maxi/sigmod/logs" + ) + if not log_directory.exists(): + raise RuntimeError(f"{log_directory} does not exist.") + + names = list(log_directory.glob("**/.name")) + + for name_file in names: + name = name_file.read_text() + pipeline_file = name_file.parent / "pipeline.log" + + if not pipeline_file.exists(): + logger.info(f"{name_file} exists, but {pipeline_file} does not") + continue + + try: + parsed_log = PipelineLogs.model_validate_json(pipeline_file.read_text()) + except: + print(f"Skipping file {pipeline_file} due to invalid format") + continue + + seed = parsed_log.config.pipeline.training.seed + existing_pipelines.append((name, seed)) + + logger.info(f"Found these existing pipelines: {existing_pipelines}") + + existing_pipelines = set(existing_pipelines) + run_id = 0 + for seed in seeds: + for triggering_strategy_id, triggering_strategy in gen_triggering_strategies( + train_gpu + ) + gen_revision_triggering_strategies(train_gpu): + if ( + isinstance(triggering_strategy, DataDriftTriggerConfig) + or isinstance(triggering_strategy, PerformanceTriggerConfig) + ) and seed != seeds[0]: + continue # only execute drift triggers once + + pipeline_config = gen_arxiv_triggering_config( + triggering_strategy_id, train_gpu, triggering_strategy, seed, START_TIMESTAMP + ) + + if run_id % num_gpus == gpu_id and (pipeline_config.pipeline.name, seed) not in existing_pipelines: + logger.info(f"Running {triggering_strategy_id} with seed {seed} on this GPU.") + pipeline_configs.append(pipeline_config) + + run_id += 1 + + print(f"Running {len(pipeline_configs)} pipelines in total now.") + host = os.getenv("MODYN_SUPERVISOR_HOST") + port = os.getenv("MODYN_SUPERVISOR_PORT") + + if not host: + host = input("Enter the supervisors host address: ") or "localhost" + if not port: + port = int(input("Enter the supervisors port: ") or "3000") + + run_multiple_pipelines( + client_config=ModynClientConfig(supervisor=Supervisor(ip=host, port=port)), + pipeline_configs=pipeline_configs, + start_replay_at=START_TIMESTAMP, + stop_replay_at=None, + maximum_triggers=None, + show_eval_progress=False, + ) + + +if __name__ == "__main__": + run_experiment() diff --git a/benchmark/sigmod/triggering/run_yb_triggering.py b/benchmark/sigmod/triggering/run_yb_triggering.py index 5dfea3cb5..754ff7700 100644 --- a/benchmark/sigmod/triggering/run_yb_triggering.py +++ b/benchmark/sigmod/triggering/run_yb_triggering.py @@ -1,23 +1,36 @@ from __future__ import annotations import logging +import math import os import sys from pathlib import Path -from benchmark.sigmod.triggering.yearbook_triggering_config import ( - gen_yearbook_triggering_config, -) +from benchmark.sigmod.triggering.yearbook_triggering_config import gen_yearbook_triggering_config, get_eval_data_config from experiments.utils.experiment_runner import run_multiple_pipelines from modyn.config.schema.pipeline import ModynPipelineConfig from modyn.config.schema.pipeline.trigger import TriggerConfig -from modyn.config.schema.pipeline.trigger.data_amount import DataAmountTriggerConfig from modyn.config.schema.pipeline.trigger.drift import DataDriftTriggerConfig from modyn.config.schema.pipeline.trigger.drift.alibi_detect import ( AlibiDetectMmdDriftMetric, ) -from modyn.config.schema.pipeline.trigger.drift.config import TimeWindowingStrategy -from modyn.config.schema.pipeline.trigger.time import TimeTriggerConfig +from modyn.config.schema.pipeline.trigger.drift.criterion import ( + DynamicQuantileThresholdCriterion, + DynamicRollingAverageThresholdCriterion, + ThresholdDecisionCriterion, +) +from modyn.config.schema.pipeline.trigger.drift.detection_window import TimeWindowingStrategy +from modyn.config.schema.pipeline.trigger.performance.criterion import ( + DynamicQuantilePerformanceThresholdCriterion, + DynamicRollingAveragePerformanceThresholdCriterion, + StaticPerformanceThresholdCriterion, +) +from modyn.config.schema.pipeline.trigger.performance.performance import ( + PerformanceTriggerConfig, + PerformanceTriggerEvaluationConfig, +) +from modyn.config.schema.pipeline.trigger.simple.data_amount import DataAmountTriggerConfig +from modyn.config.schema.pipeline.trigger.simple.time import TimeTriggerConfig from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs from modyn.utils.utils import current_time_millis from modynclient.config.schema.client_config import ModynClientConfig, Supervisor @@ -38,28 +51,139 @@ def gen_triggering_strategies() -> list[tuple[str, TriggerConfig]]: strategies = [] # TimeTriggers - for years in [3, 5, 15, 25, 40]: + for years in [1, 3, 5, 15, 25, 40]: strategies.append((f"timetrigger_{years}y", TimeTriggerConfig(every=f"{years}d"))) # DataAmountTriggers for count in [500, 1000, 2000, 10000]: strategies.append((f"amounttrigger_{count}", DataAmountTriggerConfig(num_samples=count))) - # DriftTriggers + return strategies + + +def gen_revision_triggering_strategies(device: str) -> list[tuple[str, TriggerConfig]]: + strategies = [] + min_warmup_data_points = 3500 + for evaluation_interval_data_points in [250, 500, 100]: - for threshold in [0.05, 0.07, 0.09]: - for window_size in ["1d", "2d", "5d"]: # fake timestamps, hence days + warmup_intervals = math.ceil(min_warmup_data_points / evaluation_interval_data_points) + + ## Drift Triggers + for window_size in ["1d", "2d", "5d"]: # fake timestamps, hence days + ## Static Drift + for threshold in [0.05, 0.07, 0.09]: conf = DataDriftTriggerConfig( evaluation_interval_data_points=evaluation_interval_data_points, - windowing_strategy=TimeWindowingStrategy(limit=window_size), - reset_current_window_on_trigger=False, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + sample_size=None, metrics={ - "mmd_alibi": AlibiDetectMmdDriftMetric(device="cpu", num_permutations=None, threshold=threshold) + "mmd_alibi": AlibiDetectMmdDriftMetric( + device="gpu", + num_permutations=None, + decision_criterion=ThresholdDecisionCriterion(threshold=threshold), + ) }, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, ) name = f"mmdalibi_{evaluation_interval_data_points}_{threshold}_{window_size}" strategies.append((name, conf)) + ## Dynamic Drift + for metric_window_size in [15, 30]: # how many drift scores we use for calibrating the policy + criteria = [] + for deviation in [0.05, 1, 2]: + if evaluation_interval_data_points == 100: + continue # No rolling average for very small windows + criteria.append( + ( + f"roll_{deviation}", + DynamicRollingAverageThresholdCriterion( + window_size=metric_window_size, deviation=deviation, absolute=False + ), + ) + ) + for quantile in [0.05, 0.1, 0.2, 0.3]: + criteria.append( + ( + f"qt_{quantile}", + DynamicQuantileThresholdCriterion(window_size=metric_window_size, quantile=quantile), + ) + ) + + for dec_crit_str, decision_criterion in criteria: + conf = DataDriftTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + metrics={ + "mmd_alibi": AlibiDetectMmdDriftMetric( + device=device, + num_permutations=None, + decision_criterion=decision_criterion, + ) + }, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, + ) + + name = f"mmdalibi_dyn_{evaluation_interval_data_points}_{metric_window_size}_{dec_crit_str}_{window_size}" + strategies.append((name, conf)) + + ## Static PerformanceTriggers + for threshold in [0.95, 0.9, 0.875, 0.85, 0.825, 0.8, 0.7]: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=1, # somewhat deprecated parameter, not relevant for static + data_density_window_size=1, # also ignored + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("yearbook_train") + ), + decision_criteria={ + f"static-{threshold}": StaticPerformanceThresholdCriterion( + metric="Accuracy", metric_threshold=threshold + ) + }, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, + ) + name = f"perf_{evaluation_interval_data_points}_{threshold}" + strategies.append((name, conf)) + + ## Dynamic Performance Triggers + for performance_triggers_window_size in [15, 30]: + criteria = [] + for deviation in [0.1, 0.2, 0.3]: + criterion = DynamicRollingAveragePerformanceThresholdCriterion( + metric="Accuracy", window_size=performance_triggers_window_size, deviation=deviation, absolute=False + ) + criteria.append((f"{performance_triggers_window_size}_roll_{deviation}", criterion)) + + for quantile in [0.05, 0.1, 0.2, 0.3]: + criterion = DynamicQuantilePerformanceThresholdCriterion( + metric="Accuracy", window_size=performance_triggers_window_size, quantile=quantile + ) + criteria.append((f"{performance_triggers_window_size}_qt_{quantile}", criterion)) + + for dec_crit_str, decision_criterion in criteria: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=performance_triggers_window_size, + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("yearbook_train") + ), + decision_criteria={f"dynamic-{dec_crit_str}": decision_criterion}, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, + ) + name = f"perf_dyn_{evaluation_interval_data_points}_{dec_crit_str}" + strategies.append((name, conf)) + return strategies @@ -104,8 +228,14 @@ def run_experiment() -> None: existing_pipelines = set(existing_pipelines) run_id = 0 for seed in seeds: - for triggering_strategy_id, triggering_strategy in gen_triggering_strategies(): - if isinstance(triggering_strategy, DataDriftTriggerConfig) and seed != seeds[0]: + for ( + triggering_strategy_id, + triggering_strategy, + ) in gen_triggering_strategies() + gen_revision_triggering_strategies(train_gpu): + if ( + isinstance(triggering_strategy, DataDriftTriggerConfig) + or isinstance(triggering_strategy, PerformanceTriggerConfig) + ) and seed != seeds[0]: continue # only execute drift triggers once pipeline_config = gen_yearbook_triggering_config( @@ -118,6 +248,7 @@ def run_experiment() -> None: run_id += 1 + print(f"Running {len(pipeline_configs)} pipelines in total now.") host = os.getenv("MODYN_SUPERVISOR_HOST") port = os.getenv("MODYN_SUPERVISOR_PORT") diff --git a/benchmark/sigmod/triggering/yearbook_triggering_config.py b/benchmark/sigmod/triggering/yearbook_triggering_config.py index b1b283b38..da7f188d7 100644 --- a/benchmark/sigmod/triggering/yearbook_triggering_config.py +++ b/benchmark/sigmod/triggering/yearbook_triggering_config.py @@ -19,6 +19,61 @@ from modyn.config.schema.pipeline.sampling.config import NewDataStrategyConfig from modyn.config.schema.pipeline.trigger import TriggerConfig +YEARBOOK_BYTES_PARSER_FUNC = ( + "import warnings\n" + "import torch\n" + "def bytes_parser_function(data: memoryview) -> torch.Tensor:\n" + " with warnings.catch_warnings():\n" + " warnings.simplefilter('ignore', category=UserWarning)\n" + " return torch.frombuffer(data, dtype=torch.float32).reshape(3, 32, 32)" +) + + +def get_eval_data_config(dataset: str) -> EvalDataConfig: + return EvalDataConfig( + dataset_id=dataset, + bytes_parser_function=YEARBOOK_BYTES_PARSER_FUNC, + batch_size=512, + dataloader_workers=1, + metrics=[ + AccuracyMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + topn=1, + ), + F1ScoreMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + num_classes=2, + average="weighted", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + num_classes=2, + average="macro", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + num_classes=2, + average="micro", + ), + ], + ) + def gen_yearbook_triggering_config( config_id: str, @@ -26,14 +81,6 @@ def gen_yearbook_triggering_config( trigger_config: TriggerConfig, seed: int, ) -> ModynPipelineConfig: - bytes_parser_func = ( - "import warnings\n" - "import torch\n" - "def bytes_parser_function(data: memoryview) -> torch.Tensor:\n" - " with warnings.catch_warnings():\n" - " warnings.simplefilter('ignore', category=UserWarning)\n" - " return torch.frombuffer(data, dtype=torch.float32).reshape(3, 32, 32)" - ) return ModynPipelineConfig( pipeline=Pipeline(name=f"yearbook_{config_id}", description="Yearbook triggering config", version="0.0.1"), model=ModelConfig(id="YearbookNet", config={"num_input_channels": 3, "num_classes": 2}), @@ -63,7 +110,7 @@ def gen_yearbook_triggering_config( selection_strategy=NewDataStrategyConfig( maximum_keys_in_memory=100000, storage_backend="database", tail_triggers=0, limit=-1 ), - data=DataConfig(dataset_id="yearbook_train", bytes_parser_function=bytes_parser_func), + data=DataConfig(dataset_id="yearbook_train", bytes_parser_function=YEARBOOK_BYTES_PARSER_FUNC), trigger=trigger_config, evaluation=EvaluationConfig( handlers=[ @@ -80,51 +127,6 @@ def gen_yearbook_triggering_config( after_pipeline_evaluation_workers=12, after_training_evaluation_workers=12, device=gpu_device, - datasets=[ - EvalDataConfig( - dataset_id=dataset, - bytes_parser_function=bytes_parser_func, - batch_size=512, - dataloader_workers=1, - metrics=[ - AccuracyMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - topn=1, - ), - F1ScoreMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - num_classes=2, - average="weighted", - ), - F1ScoreMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - num_classes=2, - average="macro", - ), - F1ScoreMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - num_classes=2, - average="micro", - ), - ], - ) - for dataset in ["yearbook_train", "yearbook_test"] - ], + datasets=[get_eval_data_config(dataset) for dataset in ["yearbook_train", "yearbook_test"]], ), ) diff --git a/modyn/config/schema/pipeline/evaluation/metrics.py b/modyn/config/schema/pipeline/evaluation/metrics.py index ce88e5dc3..6924fb756 100644 --- a/modyn/config/schema/pipeline/evaluation/metrics.py +++ b/modyn/config/schema/pipeline/evaluation/metrics.py @@ -36,6 +36,10 @@ def evaluation_transformer_function_deserialized(self) -> Callable | None: def shape_check(self) -> bool: return True + @property + def full_name(self) -> str: + return self.name + class AccuracyMetricConfig(_BaseMetricConfig): name: Literal["Accuracy"] = Field("Accuracy") @@ -45,6 +49,10 @@ class AccuracyMetricConfig(_BaseMetricConfig): def shape_check(self) -> bool: return self.topn <= 1 + @property + def full_name(self) -> str: + return "Accuracy" if self.topn == 1 else f"Top-{self.topn}-Accuracy" + F1ScoreTypes = Literal["macro", "micro", "weighted", "binary"] diff --git a/modyn/config/schema/pipeline/trigger/performance/performance.py b/modyn/config/schema/pipeline/trigger/performance/performance.py index 8c5ff62e0..28e9c2a46 100644 --- a/modyn/config/schema/pipeline/trigger/performance/performance.py +++ b/modyn/config/schema/pipeline/trigger/performance/performance.py @@ -84,7 +84,7 @@ class PerformanceTriggerConfig(_InternalPerformanceTriggerConfig): def validate_decision_criteria(self) -> "PerformanceTriggerConfig": """Assert that all criteria use metrics that are defined in the evaluation config.""" - metrics = {metric.name for metric in self.evaluation.dataset.metrics} + metrics = {metric.full_name for metric in self.evaluation.dataset.metrics} for criterion in self.decision_criteria.values(): if isinstance(criterion, StaticNumberAvoidableMisclassificationCriterion): continue diff --git a/modyn/metadata_database/metadata_database_connection.py b/modyn/metadata_database/metadata_database_connection.py index 428fe83b0..016259612 100644 --- a/modyn/metadata_database/metadata_database_connection.py +++ b/modyn/metadata_database/metadata_database_connection.py @@ -37,7 +37,7 @@ def __init__(self, modyn_config: dict) -> None: if "hash_partition_modulus" in self.modyn_config["metadata_database"] else 16 ) - self.seed: int = ( + self.seed: int | None = ( self.modyn_config["metadata_database"]["seed"] if "seed" in self.modyn_config["metadata_database"] else None ) if self.seed is not None: From b04a2e112dd47d2d76cc23909dec5bd3114a0d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= <2116466+MaxiBoether@users.noreply.github.com> Date: Wed, 11 Dec 2024 11:26:46 +0100 Subject: [PATCH 5/5] Small README update (#638) --- README.md | 18 +++++++++++++++--- _typos.toml | 1 + docker/Base/Dockerfile | 3 ++- docker/Dependencies/Dockerfile | 13 +++++++++---- docker/Evaluator/Dockerfile | 2 +- docker/Model_Storage/Dockerfile | 2 +- docker/Selector/Dockerfile | 2 +- docker/Supervisor/Dockerfile | 2 +- docker/Tests/Dockerfile | 2 +- docker/Trainer_Server/Dockerfile | 2 +- modyn/config/schema/pipeline/config.py | 3 +++ 11 files changed, 36 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 4a711a40c..9dfa71e8b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![codecov](https://codecov.io/github/eth-easl/modyn/graph/badge.svg?token=KFDCE03SQ4)](https://codecov.io/github/eth-easl/modyn) [![License](https://img.shields.io/github/license/eth-easl/modyn)](https://img.shields.io/github/license/eth-easl/modyn) -Modyn is an open-source platform for model training on growing datasets, i.e., datasets where points get added over time. +Modyn is a data-centric machine learning pipeline orchestrator, i.e., a platform for model training on growing datasets where points get added over time. Check out our [blog post](https://systems.ethz.ch/research/blog/modyn.html) for a brief introduction. @@ -55,9 +55,8 @@ For running all integration tests, run Checkout our [Example Pipeline](docs/EXAMPLE.md) guide for an example on how to run a Modyn pipeline. Checkout our [Technical Guidelines](docs/TECHNICAL.md) for some hints on developing Modyn and how to add new data selection and triggering policies. Checkout the [Architecture Documentation](docs/ARCHITECTURE.md) for an overview of Modyn's components. -Last, checkout our [vision paper on Modyn](https://anakli.inf.ethz.ch/papers/MLonDynamicData_EuroMLSys23.pdf) for an introduction to model training on dynamic datasets. +Last, checkout our [full paper on Modyn](https://anakli.inf.ethz.ch/papers/modyn_sigmod25.pdf) for more technical background and experiments we ran using Modyn. -We are actively developing and designing Modyn, including more thorough documentation. Please reach out via Github, Twitter, E-Mail, or any other channel of communication if you are interested in collaborating, have any questions, or have any problems running Modyn. How to [contribute](docs/CONTRIBUTING.md). @@ -81,3 +80,16 @@ We welcome input from both research and practice. Modyn is being developed at the [Efficient Architectures and Systems Lab (EASL)](https://anakli.inf.ethz.ch/#Group) at the [ETH Zurich Systems Group](https://systems.ethz.ch/). Please reach out to `mboether [at] inf [­dot] ethz [dot] ch` or open an issue on Github if you have any questions or inquiry related to Modyn and its usage. + +### Paper / Citation + +If you use Modyn, please cite our SIGMOD'25 paper: + +```bibtex +@inproceedings{Bother2025Modyn, + author = {B\"{o}ther, Maximilian and Robroek, Ties and Gsteiger, Viktor and Ma, Xianzhe and T\"{o}z\"{u}n, P{\i}nar and Klimovic, Ana}, + title = {Modyn: Data-Centric Machine Learning Pipeline Orchestration}, + booktitle = {Proceedings of the Conference on Management of Data (SIGMOD)}, + year = {2025}, +} +``` diff --git a/_typos.toml b/_typos.toml index fdbcd645e..0882f9972 100644 --- a/_typos.toml +++ b/_typos.toml @@ -5,3 +5,4 @@ extend-ignore-re = ["(?Rm)^.*# spellchecker:disable-line$"] [default.extend-words] strat = "strat" fpr = "fpr" +ther = "ther" diff --git a/docker/Base/Dockerfile b/docker/Base/Dockerfile index abff4a016..f97972536 100644 --- a/docker/Base/Dockerfile +++ b/docker/Base/Dockerfile @@ -15,7 +15,8 @@ RUN chown -R appuser /src USER appuser ENV CONDA_DEFAULT_ENV modyn ENV MAMBA_DEFAULT_ENV modyn -RUN /bin/bash -c "mamba init" +ENV MAMBA_ROOT_PREFIX /opt/mamba +RUN /bin/bash -c "mamba shell init -s bash -y" RUN echo "mamba activate modyn" >> /home/appuser/.bashrc # set environment variable to tell modyn that it is running in a container diff --git a/docker/Dependencies/Dockerfile b/docker/Dependencies/Dockerfile index 555ecb14e..9964f07a4 100644 --- a/docker/Dependencies/Dockerfile +++ b/docker/Dependencies/Dockerfile @@ -55,12 +55,17 @@ RUN git clone --recurse-submodules -b v1.59.2 --depth 1 --shallow-submodules htt make -j8 && make install && cd ../../ # Install mamba -ENV CONDA_DIR /opt/mamba -ENV MAMBA_DIR /opt/mamba -RUN wget "https://github.com/conda-forge/miniforge/releases/latest/download/Mambaforge-$(uname)-$(uname -m).sh" -O ~/mamba.sh && \ +ENV CONDA_DIR=/opt/mamba +ENV MAMBA_DIR=/opt/mamba +ENV MAMBA_ROOT_PREFIX /opt/mamba +RUN wget "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" -O ~/mamba.sh && \ /bin/bash ~/mamba.sh -b -p /opt/mamba ENV PATH=$CONDA_DIR/bin:$PATH -RUN mamba update -n base -c defaults mamba && mamba update --all && mamba init bash +RUN mamba update -n base -c defaults mamba +RUN /bin/bash -c "mamba shell init -s bash -y" +RUN mamba update --all + +# RUN /bin/bash mamba shell init # Install dependencies COPY ./environment.yml /tmp/environment.yml diff --git a/docker/Evaluator/Dockerfile b/docker/Evaluator/Dockerfile index adf669869..ba636669d 100644 --- a/docker/Evaluator/Dockerfile +++ b/docker/Evaluator/Dockerfile @@ -7,4 +7,4 @@ FROM modynbase:latest AS evaluatorimage RUN chmod a+x /src/modyn/evaluator/modyn-evaluator # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug -CMD mamba run -n modyn --no-capture-output ./modyn/evaluator/modyn-evaluator ./modyn/config/examples/modyn_config.yaml +CMD mamba run -n modyn -a "" ./modyn/evaluator/modyn-evaluator ./modyn/config/examples/modyn_config.yaml diff --git a/docker/Model_Storage/Dockerfile b/docker/Model_Storage/Dockerfile index 2c698d332..89e7f5487 100644 --- a/docker/Model_Storage/Dockerfile +++ b/docker/Model_Storage/Dockerfile @@ -10,4 +10,4 @@ RUN chown appuser /tmp/models RUN chmod -R 777 /tmp/models # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug -CMD mamba run -n modyn --no-capture-output ./modyn/model_storage/modyn-model-storage ./modyn/config/examples/modyn_config.yaml +CMD mamba run -n modyn -a "" ./modyn/model_storage/modyn-model-storage ./modyn/config/examples/modyn_config.yaml diff --git a/docker/Selector/Dockerfile b/docker/Selector/Dockerfile index 631167fbd..0491307f6 100644 --- a/docker/Selector/Dockerfile +++ b/docker/Selector/Dockerfile @@ -6,4 +6,4 @@ RUN chown appuser /tmp/trigger_samples RUN chmod -R 777 /tmp/trigger_samples # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug -CMD mamba run -n modyn --no-capture-output ./modyn/selector/modyn-selector ./modyn/config/examples/modyn_config.yaml +CMD mamba run -n modyn -a "" ./modyn/selector/modyn-selector ./modyn/config/examples/modyn_config.yaml diff --git a/docker/Supervisor/Dockerfile b/docker/Supervisor/Dockerfile index b43fef6bc..492d7992b 100644 --- a/docker/Supervisor/Dockerfile +++ b/docker/Supervisor/Dockerfile @@ -6,4 +6,4 @@ RUN chown appuser /tmp/evaluation_results RUN chmod -R 777 /tmp/evaluation_results # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug -CMD mamba run -n modyn --no-capture-output ./modyn/supervisor/modyn-supervisor ./modyn/config/examples/modyn_config.yaml +CMD mamba run -n modyn -a "" ./modyn/supervisor/modyn-supervisor ./modyn/config/examples/modyn_config.yaml diff --git a/docker/Tests/Dockerfile b/docker/Tests/Dockerfile index 1ca987185..5b0c42af2 100644 --- a/docker/Tests/Dockerfile +++ b/docker/Tests/Dockerfile @@ -10,4 +10,4 @@ USER appuser RUN chmod a+x /src/integrationtests/run.sh # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug -CMD mamba run -n modyn --no-capture-output /src/integrationtests/run.sh +CMD mamba run -n modyn -a "" /src/integrationtests/run.sh diff --git a/docker/Trainer_Server/Dockerfile b/docker/Trainer_Server/Dockerfile index ec5a330cd..88f5d426c 100644 --- a/docker/Trainer_Server/Dockerfile +++ b/docker/Trainer_Server/Dockerfile @@ -8,4 +8,4 @@ RUN mkdir -p /tmp/offline_dataset RUN chown appuser /tmp/offline_dataset # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug -CMD mamba run -n modyn --no-capture-output ./modyn/trainer_server/modyn-trainer-server ./modyn/config/examples/modyn_config.yaml +CMD mamba run -n modyn -a "" ./modyn/trainer_server/modyn-trainer-server ./modyn/config/examples/modyn_config.yaml diff --git a/modyn/config/schema/pipeline/config.py b/modyn/config/schema/pipeline/config.py index 20b9388e4..52910374e 100644 --- a/modyn/config/schema/pipeline/config.py +++ b/modyn/config/schema/pipeline/config.py @@ -56,3 +56,6 @@ def validate_bts_training_selection_works(self) -> Self: ) return self + + +ModynPipelineConfig.model_rebuild()