From b63795fedaddb4d1910565d38d7debd0c0492cf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= <2116466+MaxiBoether@users.noreply.github.com> Date: Wed, 16 Oct 2024 16:19:40 +0200 Subject: [PATCH] SIGMOD Revision Experiments (#610) --- .gitignore | 4 + .../pages/plots/cost_vs_eval_metric_agg.py | 8 +- analytics/app/pages/plots/eval_heatmap.py | 7 +- analytics/app/pages/plots/eval_over_time.py | 7 +- analytics/app/pages/plots/num_samples.py | 10 +- .../pages/plots/num_triggers_eval_metric.py | 6 +- .../sigmod/arxiv_scatter_triggering.ipynb | 526 ++++++++++++++++++ analytics/plotting/sigmod/yb_heatmap.ipynb | 34 +- .../sigmod/yb_scatter_selection.ipynb | 55 +- .../sigmod/yb_scatter_triggering.ipynb | 131 +++-- .../tools/aggregate_runs/core_aggregation.py | 2 +- .../triggering/arxiv_triggering_config.py | 136 +++++ .../sigmod/triggering/run_arxiv_triggering.py | 271 +++++++++ .../sigmod/triggering/run_yb_triggering.py | 161 +++++- .../triggering/yearbook_triggering_config.py | 112 ++-- .../schema/pipeline/evaluation/metrics.py | 8 + .../trigger/performance/performance.py | 2 +- .../metadata_database_connection.py | 2 +- 18 files changed, 1334 insertions(+), 148 deletions(-) create mode 100644 analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb create mode 100644 benchmark/sigmod/triggering/arxiv_triggering_config.py create mode 100644 benchmark/sigmod/triggering/run_arxiv_triggering.py diff --git a/.gitignore b/.gitignore index 5833ad5d8..92f26d4e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +# Images +*.svg +*.png + # Logging files *.log diff --git a/analytics/app/pages/plots/cost_vs_eval_metric_agg.py b/analytics/app/pages/plots/cost_vs_eval_metric_agg.py index 1abb78fc6..5850b34df 100644 --- a/analytics/app/pages/plots/cost_vs_eval_metric_agg.py +++ b/analytics/app/pages/plots/cost_vs_eval_metric_agg.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import get_args +from typing import Any, get_args import pandas as pd import plotly.express as px @@ -32,9 +32,9 @@ class _PageState: def gen_fig_scatter_num_triggers( page: str, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | Any | None, + dataset_id: str | Any | None, + metric: str | Any | None, agg_func_x: AGGREGATION_FUNCTION, agg_func_y: EVAL_AGGREGATION_FUNCTION, stages: list[str], diff --git a/analytics/app/pages/plots/eval_heatmap.py b/analytics/app/pages/plots/eval_heatmap.py index 7b9c63635..85676623d 100644 --- a/analytics/app/pages/plots/eval_heatmap.py +++ b/analytics/app/pages/plots/eval_heatmap.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Any import pandas as pd from dash import Input, Output, callback, dcc, html @@ -31,9 +32,9 @@ def gen_figure( page: str, multi_pipeline_mode: bool, patch_yearbook: bool, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | Any | None, + dataset_id: str | Any | None, + metric: str | Any | None, ) -> go.Figure: """Create the cost over time figure with barplot or histogram. Histogram has nice binning while barplot is precise. diff --git a/analytics/app/pages/plots/eval_over_time.py b/analytics/app/pages/plots/eval_over_time.py index a0d422b7a..dd57cec08 100644 --- a/analytics/app/pages/plots/eval_over_time.py +++ b/analytics/app/pages/plots/eval_over_time.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Any import pandas as pd import plotly.express as px @@ -29,9 +30,9 @@ def gen_figure( page: str, multi_pipeline_mode: bool, patch_yearbook: bool, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | Any | None, + dataset_id: str | Any | None, + metric: str | Any | None, ) -> go.Figure: """Create the evaluation over time figure with a line plot. diff --git a/analytics/app/pages/plots/num_samples.py b/analytics/app/pages/plots/num_samples.py index b33ad55be..ef8b46a5d 100644 --- a/analytics/app/pages/plots/num_samples.py +++ b/analytics/app/pages/plots/num_samples.py @@ -34,12 +34,12 @@ class _PageState: def gen_figure( page: str, multi_pipeline_mode: bool, - time_metric: str, - y_axis: YAxis, - use_scatter_size: bool, + time_metric: str | None, + y_axis: YAxis | None, + use_scatter_size: bool | None, patch_yearbook: bool, - dataset_id: str, - eval_handler: str, + dataset_id: str | None, + eval_handler: str | None, ) -> go.Figure: """Create the cost over time figure with barplot or histogram. Histogram has nice binning while barplot is precise. diff --git a/analytics/app/pages/plots/num_triggers_eval_metric.py b/analytics/app/pages/plots/num_triggers_eval_metric.py index ef685e2f1..4bf391b93 100644 --- a/analytics/app/pages/plots/num_triggers_eval_metric.py +++ b/analytics/app/pages/plots/num_triggers_eval_metric.py @@ -31,9 +31,9 @@ class _PageState: def gen_fig_scatter_num_triggers( page: str, multi_pipeline_mode: bool, - eval_handler: str, - dataset_id: str, - metric: str, + eval_handler: str | None, + dataset_id: str | None, + metric: str | None, aggregate_metric: bool = True, time_weighted: bool = True, only_active_periods: bool = True, diff --git a/analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb b/analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb new file mode 100644 index 000000000..c4911df62 --- /dev/null +++ b/analytics/plotting/sigmod/arxiv_scatter_triggering.ipynb @@ -0,0 +1,526 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections import defaultdict\n", + "from pathlib import Path\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "\n", + "from analytics.app.data.load import list_pipelines\n", + "from analytics.app.data.transform import (\n", + " df_aggregate_eval_metric,\n", + " dfs_models_and_evals,\n", + " logs_dataframe,\n", + ")\n", + "from analytics.plotting.common.common import init_plot\n", + "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# INPUTS\n", + "\n", + "pipelines_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/arxiv/revision/final_arxiv_revision_logs_agg\")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/dynamic_datasets_dsl/analytics/plotting/sigmod\")\n", + "assert pipelines_dir.exists()\n", + "assert output_dir.exists()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipelines = list_pipelines(pipelines_dir)\n", + "max_pipeline_id = max(pipelines.keys())\n", + "pipelines" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from analytics.app.data.load import load_pipeline_logs\n", + "\n", + "pipeline_logs = {p_id: load_pipeline_logs(p_id, pipelines_dir) for (p_id, (_, p_path)) in pipelines.items()}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# mode:\n", + "# single pipeline\n", + "pipeline_ids = (\n", + " [p_id for p_id, (p, _) in pipelines.items() if \"timetrigger\" in p and (\"_1y\" in p or \"_3y\" in p or \"_5y\" in p)]\n", + " + [p_id for p_id, (p, _) in pipelines.items() if \"amount\" in p and (\"30000\" in p or \"50000\" in p)]\n", + " + [\n", + " # drift\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"kaggle_arxiv_mmdalibi_5000_0.005_1y\",\n", + " \"kaggle_arxiv_mmdalibi_5000_0.01_1y\",\n", + " \"kaggle_arxiv_mmdalibi_5000_0.02_1y\",\n", + " \"kaggle_arxiv_mmdalibi_dyn_5000_15_qt_0.05_1y\",\n", + " }\n", + " ]\n", + " + [\n", + " # perf\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"kaggle_arxiv_perf_5000_0.7\",\n", + " \"kaggle_arxiv_perf_5000_0.75\",\n", + " }\n", + " ]\n", + ")\n", + "composite_model_variant = \"currently_active_model\" # currently_trained_model\n", + "patch_yearbook = False\n", + "dataset_id = \"arxiv_kaggle_test\"\n", + "eval_handler = \"exactmatrix\"\n", + "metric = \"Top-2-Accuracy\"\n", + "include_composite_model = False\n", + "\n", + "\n", + "def pipeline_name_mapper(name: str) -> str:\n", + " name = name.replace(\"kaggle_arxiv_\", \"\")\n", + "\n", + " if \"amounttrigger\" in name:\n", + " name = name.replace(\"amounttrigger_\", \"\")\n", + " name = name + \" samples\"\n", + " elif \"timetrigger\" in name:\n", + " name = name.replace(\"timetrigger_\", \"\")\n", + " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", + " elif \"perf\" in name:\n", + " name = name.replace(\"perf_\", \"\")\n", + " name = \"< \" + name.replace(\"5000_\", \"\")\n", + " elif \"dyn\" in name:\n", + " name = \"AutoDrift\"\n", + " else:\n", + " name = name.replace(\"mmdalibi_\", \"\")\n", + " name = name.replace(\"_\", \"/\")\n", + "\n", + " return name\n", + "\n", + "\n", + "pipelines = {p_id: (pipeline_name_mapper(pname), p_path) for p_id, (pname, p_path) in pipelines.items()}\n", + "\n", + "unified_pids = []\n", + "names = set()\n", + "for p_id, (pname, _) in pipelines.items():\n", + " if p_id in pipeline_ids and pname not in names:\n", + " unified_pids.append(p_id)\n", + " names.add(pname)\n", + "pipeline_ids = unified_pids\n", + "\n", + "\n", + "[(p_id, pname) for p_id, (pname, _) in pipelines.items() if p_id in pipeline_ids]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Wrangle data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "list_df_eval_single: list[pd.DataFrame] = []\n", + "list_df_all: list[pd.DataFrame] = []\n", + "\n", + "for pipeline_id in pipeline_ids:\n", + " df_all = logs_dataframe(pipeline_logs[pipeline_id], pipelines[pipeline_id][0])\n", + " list_df_all.append(df_all)\n", + "\n", + " _, _, df_eval_single = dfs_models_and_evals(\n", + " pipeline_logs[pipeline_id], df_all[\"sample_time\"].max(), pipelines[pipeline_id][0]\n", + " )\n", + " list_df_eval_single.append(df_eval_single)\n", + "\n", + "df_adjusted = pd.concat(list_df_eval_single)\n", + "df_adjusted\n", + "\n", + "df_all = pd.concat(list_df_all)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted = df_adjusted[\n", + " (df_adjusted[\"dataset_id\"] == dataset_id)\n", + " & (df_adjusted[\"eval_handler\"] == eval_handler)\n", + " & (df_adjusted[\"metric\"] == metric)\n", + "]\n", + "\n", + "# in percent (0-100)\n", + "df_adjusted[\"value\"] = df_adjusted[\"value\"] * 100\n", + "df_adjusted" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted = df_adjusted.sort_values(by=[\"interval_center\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Reduce to composite models\n", + "df_adjusted = df_adjusted[df_adjusted[composite_model_variant]]\n", + "df_adjusted[composite_model_variant].unique()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dump Data backup" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create Plot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# reduce evaluation interval to interval where all policies have evaluations\n", + "min_active_eval_center_per_pipeline = (\n", + " df_adjusted[df_adjusted[composite_model_variant]].groupby(\"pipeline_ref\")[\"interval_center\"].min()\n", + ")\n", + "maximum_min = min_active_eval_center_per_pipeline.max()\n", + "print(maximum_min, min_active_eval_center_per_pipeline)\n", + "\n", + "df_adjusted = df_adjusted[df_adjusted[\"interval_center\"] >= maximum_min]\n", + "df_adjusted[\"interval_center\"].unique()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted[\"interval_center\"] = df_adjusted[\"interval_center\"].astype(str).str.split(\"-\").str[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Aggregate metrics to a scalar value per pipeline\n", + "mean_accuracies = df_aggregate_eval_metric(\n", + " df_adjusted,\n", + " group_by=[\"pipeline_ref\", \"metric\"],\n", + " in_col=\"value\",\n", + " out_col=\"metric_value\",\n", + " aggregate_func=\"mean\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_triggers = df_all[df_all[\"id\"] == PipelineStage.HANDLE_SINGLE_TRIGGER.name]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_triggers = df_all[df_all[\"id\"] == PipelineStage.HANDLE_SINGLE_TRIGGER.name]\n", + "df_triggers = df_triggers[df_triggers[\"sample_time\"] > maximum_min]\n", + "df_triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Find number of trigger per pipeline that are after maximum_min\n", + "\n", + "# before the cutoff there was one trigger (equivalent to start of our reduced dataset): +1\n", + "num_triggers = df_triggers.groupby(\"pipeline_ref\").aggregate(count=(\"id\", \"count\")) + 1\n", + "num_triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "merged = num_triggers.merge(mean_accuracies, on=\"pipeline_ref\")\n", + "assert mean_accuracies.shape[0] == merged.shape[0]\n", + "merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_type(x: str):\n", + " if \"year\" in x:\n", + " return \"yearly\"\n", + " elif \"samples\" in x:\n", + " return \"amount\"\n", + " elif \"y\" in x:\n", + " return \"drift\"\n", + " elif \"<\" in x:\n", + " return \"perf\"\n", + " elif \"AutoDrift\" in x:\n", + " return \"drift\"\n", + " else:\n", + " return \"unknown\"\n", + "\n", + "\n", + "merged[\"type\"] = merged[\"pipeline_ref\"].apply(lambda x: create_type(x))\n", + "merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette = sns.color_palette(\"RdBu\", 10)\n", + "palette" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2 = sns.color_palette(\"colorblind\", 10)\n", + "palette2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the heatmap\n", + "init_plot()\n", + "# sns.set_theme(style=\"ticks\")\n", + "# plt.rcParams['svg.fonttype'] = 'none'\n", + "sns.set_style(\"whitegrid\")\n", + "\n", + "FONTSIZE = 20\n", + "DOUBLE_FIG_WIDTH = 10\n", + "DOUBLE_FIG_HEIGHT = 3.5\n", + "DOUBLE_FIG_SIZE = (DOUBLE_FIG_WIDTH, 1.5 * DOUBLE_FIG_HEIGHT)\n", + "\n", + "fig = plt.figure(\n", + " edgecolor=\"black\",\n", + " frameon=True,\n", + " figsize=DOUBLE_FIG_SIZE,\n", + " dpi=300,\n", + ")\n", + "\n", + "markers = {\"drift\": \"X\", \"yearly\": \"o\", \"amount\": \"D\", \"perf\": \"*\"}\n", + "\n", + "ax = sns.scatterplot(\n", + " merged,\n", + " x=\"count\",\n", + " y=\"metric_value\",\n", + " hue=\"type\",\n", + " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1], \"perf\": palette2[4]},\n", + " s=200,\n", + " legend=True,\n", + " markers=markers,\n", + " style=\"type\", # required for markers\n", + " edgecolor=\"none\",\n", + " # annotations\n", + ")\n", + "ax.set(ylim=(55, 80))\n", + "ax.set(xlim=(-4, 170))\n", + "\n", + "for i in range(merged.shape[0]):\n", + " offsets = defaultdict(lambda: (+1.5, -0.25))\n", + " offsets.update(\n", + " {\n", + " # x, y\n", + " \"3 years\": (+2, -1),\n", + " \"1 year\": (-3.5, +1),\n", + " \"5 years\": (+2, -0.15),\n", + " \"50000 samples\": (+3, -1),\n", + " \"30000 samples\": (-3, +1),\n", + " \"5000/0.005/1y\": (-9, -2),\n", + " \"5000/0.01/1y\": (+2.5, -0.5),\n", + " \"5000/0.02/1y\": (+2.5, -0.5),\n", + " \"< 0.75\": (-18, -0.5),\n", + " \"< 0.7\": (-3.5, +1),\n", + " \"AutoDrift\": (+1.5, -1.5),\n", + " }\n", + " )\n", + " plt.rc(\"text\", usetex=True)\n", + "\n", + " def fix_s(ref: str) -> str:\n", + " if ref[0] != \"<\":\n", + " return r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\"\n", + "\n", + " return r\"$\\mathbf{<}$ \\textbf{\" + ref[2:] + \"}\"\n", + "\n", + " plt.text(\n", + " x=merged[\"count\"][i] + offsets[merged[\"pipeline_ref\"][i]][0],\n", + " y=merged[\"metric_value\"][i] + offsets[merged[\"pipeline_ref\"][i]][1],\n", + " s=fix_s(merged[\"pipeline_ref\"][i]),\n", + " fontdict=dict(color=\"black\", fontsize=17),\n", + " )\n", + " plt.rc(\"text\", usetex=False)\n", + "\n", + "\n", + "# Adjust x-axis tick labels\n", + "plt.xlabel(\"Number of triggers\", labelpad=10)\n", + "plt.xticks(\n", + " ticks=[x for x in range(0, 170 + 1, 20)],\n", + " labels=[x for x in range(0, 170 + 1, 20)],\n", + " rotation=0,\n", + " # ha='right'\n", + ")\n", + "\n", + "# Set y-axis ticks to be equally spaced\n", + "plt.ylabel(\"Mean Top-2 Accuracy %\", labelpad=15)\n", + "plt.yticks(\n", + " ticks=[x for x in range(56, 80 + 1, 3)],\n", + " labels=[x for x in range(56, 80 + 1, 3)],\n", + " rotation=0,\n", + ")\n", + "\n", + "\n", + "label_mapping = {\"drift\": \"Drift\", \"yearly\": \"Time\", \"amount\": \"Amount\", \"perf\": \"Performance\"}\n", + "handles, labels = ax.get_legend_handles_labels()\n", + "latex_labels = [f\"{label_mapping.get(label, label)} \" for label in labels]\n", + "\n", + "legend = ax.legend(\n", + " loc=\"lower right\",\n", + " ncol=2,\n", + " handles=handles,\n", + " labels=latex_labels,\n", + " labelspacing=0.2,\n", + " columnspacing=0.9,\n", + " handlelength=1.3,\n", + ")\n", + "\n", + "\n", + "# Display the plot\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save Plot as svg" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for img_type in [\"png\", \"svg\"]:\n", + " img_path = output_dir / f\"scatter_arxiv.{img_type}\"\n", + " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2[4]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/analytics/plotting/sigmod/yb_heatmap.ipynb b/analytics/plotting/sigmod/yb_heatmap.ipynb index 05e29ce5d..eb42b214c 100644 --- a/analytics/plotting/sigmod/yb_heatmap.ipynb +++ b/analytics/plotting/sigmod/yb_heatmap.ipynb @@ -27,14 +27,14 @@ "source": [ "# INPUTS\n", "\n", - "drift_pipeline = True\n", + "drift_pipeline = False\n", "if drift_pipeline:\n", - " pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/triggering/logs_agg\")\n", - "else:\n", " pipelines_dir = Path(\n", - " \"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/data_selection_50%/logs_agg_patch\"\n", + " \"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/triggering_revision/logs_revision_fullrerun_agg\"\n", " )\n", - "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "else:\n", + " pipelines_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/data_selection_50%/logs_agg_patch\")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data\")\n", "assert pipelines_dir.exists()\n", "assert output_dir.exists()" ] @@ -67,7 +67,7 @@ "metadata": {}, "outputs": [], "source": [ - "type(pipeline_logs[5 if not drift_pipeline else 38])" + "type(pipeline_logs[5 if not drift_pipeline else 13])" ] }, { @@ -77,7 +77,7 @@ "outputs": [], "source": [ "# mode:\n", - "pipeline_id = 5 if not drift_pipeline else 38\n", + "pipeline_id = 5 if not drift_pipeline else 13\n", "\n", "# doesn't do anything unless include_composite_model = True\n", "composite_model_variant = \"currently_trained_model\" if not drift_pipeline else \"currently_active_model\"\n", @@ -258,9 +258,9 @@ "outputs": [], "source": [ "# Create the heatmap\n", - "from analytics.plotting.common.common import INIT_PLOT\n", + "from analytics.plotting.common.common import init_plot\n", "\n", - "INIT_PLOT()\n", + "init_plot()\n", "# sns.set_theme(style=\"ticks\")\n", "plt.rcParams[\"svg.fonttype\"] = \"none\"\n", "\n", @@ -315,11 +315,11 @@ " rotation=0,\n", " # ha='right'\n", " )\n", - "plt.ylabel(\"Trained up to\")\n", + "plt.ylabel(\"Model trained on data up to\")\n", "\n", "# Draft training boxes\n", "if drift_pipeline:\n", - " for type_, dashed in [(\"train\", False), (\"usage\", False), (\"train\", True)]:\n", + " for type_, dashed in [(\"usage\", False)]: # [(\"train\", False), (\"usage\", False), (\"train\", True)]:\n", " for active_ in df_logs_models.iterrows():\n", " x_start = active_[1][f\"{type_}_start\"].year - 1930\n", " x_end = active_[1][f\"{type_}_end\"].year - 1930\n", @@ -360,13 +360,21 @@ "source": [ "for img_type in [\"png\", \"svg\"]:\n", " img_path = output_dir / f\"yearbook_heatmap{'_trigger' if drift_pipeline else ''}.{img_type}\"\n", + " print(img_path)\n", " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "modyn", "language": "python", "name": "python3" }, @@ -380,7 +388,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/analytics/plotting/sigmod/yb_scatter_selection.ipynb b/analytics/plotting/sigmod/yb_scatter_selection.ipynb index 246c78db5..f02089513 100644 --- a/analytics/plotting/sigmod/yb_scatter_selection.ipynb +++ b/analytics/plotting/sigmod/yb_scatter_selection.ipynb @@ -33,8 +33,8 @@ "source": [ "# INPUTS\n", "\n", - "pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/data_selection_50%/logs_agg_patch\")\n", - "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "pipelines_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/data_selection_50%/logs_agg_patch\")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/dynamic_datasets_dsl\")\n", "assert pipelines_dir.exists()\n", "assert output_dir.exists()" ] @@ -278,11 +278,13 @@ "outputs": [], "source": [ "# Create the heatmap\n", - "from analytics.plotting.common.common import INIT_PLOT\n", + "from analytics.plotting.common.common import init_plot\n", "\n", - "INIT_PLOT()\n", + "plt.rcParams[\"svg.fonttype\"] = \"none\"\n", + "plt.rcParams[\"text.usetex\"] = False\n", + "\n", + "init_plot()\n", "# sns.set_theme(style=\"ticks\")\n", - "# plt.rcParams['svg.fonttype'] = 'none'\n", "sns.set_style(\"whitegrid\")\n", "\n", "FONTSIZE = 20\n", @@ -297,6 +299,20 @@ " dpi=300,\n", ")\n", "\n", + "label_map = {\n", + " \"Loss \": \"Loss \\\\y{a} \",\n", + " \"DLIS \": \"DLIS \\\\y{a} \",\n", + " \"Uniform \": \"Uniform \",\n", + " \"Class-Bal. \": \"Class-Bal. \",\n", + " \"RS2 \": \"RS2 \\\\y{b} \",\n", + " \"RS2 (w/o) \": \"RS2 (w/o) \\\\y{b} \",\n", + " \"Margin \": \"Margin \\\\y{c} \",\n", + " \"Least conf. \": \"Least conf. \\\\y{c} \",\n", + " \"Entropy \": \"Entropy \\\\y{c} \",\n", + "}\n", + "\n", + "mean_accuracies_candidate[\"pipeline_ref_mapped\"] = mean_accuracies_candidate[\"pipeline_ref\"].map(label_map)\n", + "\n", "palette = sns.color_palette(\"RdBu_r\", 10)\n", "palette = [palette[1], palette[1]]\n", "ax = sns.stripplot(\n", @@ -321,7 +337,7 @@ " linewidth=3,\n", ")\n", "\n", - "plt.text(s=\"Full data training\", x=-0.2, y=mean_accuracy_ref[\"metric_value\"].values[0] - 2, color=\"dimgrey\")\n", + "plt.text(s=\"Full data training\", x=-0.2, y=mean_accuracy_ref[\"metric_value\"].values[0] - 1, color=\"dimgrey\")\n", "\n", "\n", "# Set x-axis\n", @@ -336,8 +352,19 @@ " rotation=0,\n", ")\n", "\n", + "\n", + "a = ax.get_xticklabels()\n", + "n = []\n", + "for lbl in a:\n", + " print(lbl)\n", + " lbl.set_text(label_map[lbl.get_text()])\n", + " n.append(lbl)\n", + "ax.set_xticklabels(a)\n", + "\n", + "\n", "# Display the plot\n", - "plt.tight_layout()\n", + "# plt.tight_layout()\n", + "\n", "plt.show()" ] }, @@ -356,13 +383,21 @@ "source": [ "for img_type in [\"png\", \"svg\"]:\n", " img_path = output_dir / f\"scatter_selection_yb.{img_type}\"\n", - " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" + " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)\n", + " print(output_dir / f\"scatter_selection_yb.{img_type}\")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "modyn", "language": "python", "name": "python3" }, @@ -376,7 +411,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/analytics/plotting/sigmod/yb_scatter_triggering.ipynb b/analytics/plotting/sigmod/yb_scatter_triggering.ipynb index 192516a2f..75fcef42d 100644 --- a/analytics/plotting/sigmod/yb_scatter_triggering.ipynb +++ b/analytics/plotting/sigmod/yb_scatter_triggering.ipynb @@ -6,6 +6,7 @@ "metadata": {}, "outputs": [], "source": [ + "from collections import defaultdict\n", "from pathlib import Path\n", "\n", "import matplotlib.pyplot as plt\n", @@ -19,6 +20,7 @@ " logs_dataframe,\n", " patch_yearbook_time,\n", ")\n", + "from analytics.plotting.common.common import init_plot\n", "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", "\n", "%load_ext autoreload\n", @@ -33,8 +35,10 @@ "source": [ "# INPUTS\n", "\n", - "pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/triggering/logs_agg\")\n", - "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "pipelines_dir = Path(\n", + " \"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/triggering_revision/logs_revision_fullrerun_agg\"\n", + ")\n", + "output_dir = Path(\"/Users/mboether/phd/dynamic-data/dynamic_datasets_dsl/analytics/plotting/sigmod\")\n", "assert pipelines_dir.exists()\n", "assert output_dir.exists()" ] @@ -67,7 +71,7 @@ "metadata": {}, "outputs": [], "source": [ - "type(pipeline_logs[32])" + "type(pipeline_logs[13])" ] }, { @@ -97,12 +101,24 @@ " for p_id, (p, _) in pipelines.items()\n", " if p\n", " in {\n", - " \"yearbook_mmdalibi_250_0.05_5d\",\n", - " \"yearbook_mmdalibi_250_0.07_1d\",\n", - " \"yearbook_mmdalibi_250_0.07_5d\",\n", - " \"yearbook_mmdalibi_250_0.05_1d\",\n", - " \"yearbook_mmdalibi_500_0.05_1d\",\n", - " \"yearbook_mmdalibi_100_0.05_1d\",\n", + " \"yearbook_mmdalibi_250_0.05_5d\", # ok\n", + " \"yearbook_mmdalibi_100_0.05_1d\", # ok\n", + " \"yearbook_mmdalibi_250_0.07_5d\", # ok\n", + " \"yearbook_mmdalibi_250_0.07_1d\", # n\n", + " \"yearbook_mmdalibi_250_0.05_1d\", # n\n", + " \"yearbook_mmdalibi_dyn_250_15_qt_0.05_5d\",\n", + " }\n", + " ]\n", + " + [\n", + " # perf\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"yearbook_perf_250_0.95\",\n", + " \"yearbook_perf_250_0.9\",\n", + " \"yearbook_perf_250_0.85\",\n", + " \"yearbook_perf_250_0.8\",\n", " }\n", " ]\n", ")\n", @@ -116,15 +132,22 @@ "\n", "def pipeline_name_mapper(name: str) -> str:\n", " name = name.replace(\"yearbook_\", \"\")\n", - " name = name.replace(\"timetrigger_\", \"\") # \"every \"\n", - " name = name.replace(\"amounttrigger_\", \"\") # \"every \"\n", - " name = name.replace(\"mmdalibi_\", \"\")\n", - " if name.endswith(\"y\"):\n", - " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", - " elif not name.endswith(\"d\"): # dataamount\n", + "\n", + " if \"amounttrigger\" in name:\n", + " name = name.replace(\"amounttrigger_\", \"\")\n", " name = name + \" samples\"\n", - " else: # drift\n", + " elif \"timetrigger\" in name:\n", + " name = name.replace(\"timetrigger_\", \"\")\n", + " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", + " elif \"perf\" in name:\n", + " name = name.replace(\"perf_\", \"\")\n", + " name = \"< \" + name.replace(\"250_\", \"\")\n", + " elif \"dyn\" in name:\n", + " name = \"AutoDrift\"\n", + " else:\n", + " name = name.replace(\"mmdalibi_\", \"\")\n", " name = name.replace(\"_\", \"/\")\n", + "\n", " return name\n", "\n", "\n", @@ -326,6 +349,10 @@ " return \"amount\"\n", " elif \"d\" in x:\n", " return \"drift\"\n", + " elif \"<\" in x:\n", + " return \"perf\"\n", + " elif \"AutoDrift\" in x:\n", + " return \"drift\"\n", " else:\n", " return \"unknown\"\n", "\n", @@ -361,11 +388,7 @@ "outputs": [], "source": [ "# Create the heatmap\n", - "from collections import defaultdict\n", - "\n", - "from analytics.plotting.common.common import INIT_PLOT\n", - "\n", - "INIT_PLOT()\n", + "init_plot()\n", "# sns.set_theme(style=\"ticks\")\n", "# plt.rcParams['svg.fonttype'] = 'none'\n", "sns.set_style(\"whitegrid\")\n", @@ -382,15 +405,19 @@ " dpi=300,\n", ")\n", "\n", + "markers = {\"drift\": \"X\", \"yearly\": \"o\", \"amount\": \"D\", \"perf\": \"*\"}\n", + "\n", "ax = sns.scatterplot(\n", " merged,\n", " x=\"count\",\n", " y=\"metric_value\",\n", " hue=\"type\",\n", - " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1]},\n", + " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1], \"perf\": palette2[4]},\n", " s=200,\n", - " legend=False,\n", - " marker=\"X\",\n", + " legend=True,\n", + " markers=markers,\n", + " style=\"type\", # required for markers\n", + " edgecolor=\"none\",\n", " # annotations\n", ")\n", "ax.set(ylim=(85, 94.5))\n", @@ -403,21 +430,33 @@ " # x, y\n", " \"3 years\": (-3, +0.5),\n", " \"1 year\": (-2, -0.85),\n", - " \"5 years\": (-3, +0.5),\n", + " \"5 years\": (+1.8, -0.15),\n", " \"500 samples\": (-5, +0.5),\n", - " \"2000 samples\": (+1.7, -0.25),\n", - " \"250/0.05/5d\": (-2, +0.5),\n", - " \"100/0.05/1d\": (+1.5, -0.7),\n", - " \"500/0.05/1d\": (+1.5, 0.15),\n", - " \"250/0.07/1d\": (+1.5, -0.55),\n", - " \"250/0.05/1d\": (-10, +0.4),\n", + " \"2000 samples\": (-16, -0.25),\n", + " \"250/0.05/5d\": (+2, -0.2),\n", + " \"100/0.05/1d\": (+1.2, +0.25),\n", + " # \"500/0.05/1d\": (+1.5, 0.15),\n", + " \"250/0.07/1d\": (+2.4, -0.1),\n", + " \"250/0.05/1d\": (-13, -0.7),\n", + " \"< 0.95\": (-4.5, -0.8),\n", + " \"< 0.9\": (-2.5, +0.5),\n", + " \"< 0.85\": (-3, -0.75),\n", + " \"< 0.8\": (-3, +0.3),\n", + " \"AutoDrift\": (-5, +0.4),\n", " }\n", " )\n", " plt.rc(\"text\", usetex=True)\n", + "\n", + " def fix_s(ref: str) -> str:\n", + " if ref[0] != \"<\":\n", + " return r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\"\n", + "\n", + " return r\"$\\mathbf{<}$ \\textbf{\" + ref[2:] + \"}\"\n", + "\n", " plt.text(\n", " x=merged[\"count\"][i] + offsets[merged[\"pipeline_ref\"][i]][0],\n", " y=merged[\"metric_value\"][i] + offsets[merged[\"pipeline_ref\"][i]][1],\n", - " s=r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\",\n", + " s=fix_s(merged[\"pipeline_ref\"][i]),\n", " fontdict=dict(color=\"black\", fontsize=17),\n", " )\n", " plt.rc(\"text\", usetex=False)\n", @@ -440,6 +479,21 @@ " rotation=0,\n", ")\n", "\n", + "label_mapping = {\"drift\": \"Drift\", \"yearly\": \"Time\", \"amount\": \"Amount\", \"perf\": \"Performance\"}\n", + "handles, labels = ax.get_legend_handles_labels()\n", + "latex_labels = [f\"{label_mapping.get(label, label)} \" for label in labels]\n", + "\n", + "legend = ax.legend(\n", + " loc=\"lower right\",\n", + " ncol=2,\n", + " handles=handles,\n", + " labels=latex_labels,\n", + " labelspacing=0.2,\n", + " columnspacing=0.9,\n", + " handlelength=1.3,\n", + ")\n", + "\n", + "\n", "# Display the plot\n", "plt.tight_layout()\n", "plt.show()" @@ -463,6 +517,15 @@ " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2[4]" + ] + }, { "cell_type": "code", "execution_count": null, @@ -473,7 +536,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "modyn", "language": "python", "name": "python3" }, @@ -487,7 +550,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/analytics/tools/aggregate_runs/core_aggregation.py b/analytics/tools/aggregate_runs/core_aggregation.py index d915790dc..cb93d05a6 100644 --- a/analytics/tools/aggregate_runs/core_aggregation.py +++ b/analytics/tools/aggregate_runs/core_aggregation.py @@ -10,7 +10,7 @@ from modyn.supervisor.internal.grpc.enums import PipelineStage from modyn.supervisor.internal.pipeline_executor.models import MultiEvaluationInfo, PipelineLogs -DEBUGGING_MODE = True +DEBUGGING_MODE = False """If True, the the process will halt on breakpoints to allow for manual verification.""" diff --git a/benchmark/sigmod/triggering/arxiv_triggering_config.py b/benchmark/sigmod/triggering/arxiv_triggering_config.py new file mode 100644 index 000000000..430b7e89f --- /dev/null +++ b/benchmark/sigmod/triggering/arxiv_triggering_config.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from modyn.config import ( + CheckpointingConfig, + OptimizationCriterion, + OptimizerConfig, + OptimizerParamGroup, +) +from modyn.config.schema.pipeline import ( + AccuracyMetricConfig, + DataConfig, + EvalDataConfig, + EvaluationConfig, + F1ScoreMetricConfig, + FullModelStrategy, + ModelConfig, + ModynPipelineConfig, + Pipeline, + PipelineModelStorageConfig, + TrainingConfig, +) +from modyn.config.schema.pipeline.evaluation.handler import EvalHandlerConfig +from modyn.config.schema.pipeline.evaluation.strategy.periodic import PeriodicEvalStrategyConfig +from modyn.config.schema.pipeline.sampling.config import NewDataStrategyConfig +from modyn.config.schema.pipeline.trigger import TriggerConfig +from modyn.utils.utils import SECONDS_PER_UNIT + + +def gen_arxiv_training_conf(gpu_device: str, seed: int): + opti_conf = OptimizerConfig( + name="default", + algorithm="AdamW", + source="PyTorch", + param_groups=[OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})], + ) + + return TrainingConfig( + gpus=1, + device=gpu_device, + dataloader_workers=1, + use_previous_model=True, + initial_model="random", + batch_size=128, + optimizers=[opti_conf], + optimization_criterion=OptimizationCriterion(name="CrossEntropyLoss"), + checkpointing=CheckpointingConfig(activated=False), + lr_scheduler=None, + epochs_per_trigger=5, + shuffle=True, + amp=False, + seed=seed, + ) + + +ARXIV_BPF = ( + "import torch\n" "def bytes_parser_function(data: memoryview) -> torch.Tensor:\n" " return str(data, 'utf8')" +) + +ARXIV_EVAL_FUNC = ( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" +) + + +def get_eval_data_config(dataset: str) -> EvalDataConfig: + return EvalDataConfig( + dataset_id=dataset, + bytes_parser_function=ARXIV_BPF, + tokenizer="DistilBertTokenizerTransform", + batch_size=256, + dataloader_workers=1, + metrics=[ + AccuracyMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + topn=1, + ), + AccuracyMetricConfig(evaluation_transformer_function="", topn=2), + AccuracyMetricConfig(evaluation_transformer_function="", topn=5), + F1ScoreMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + num_classes=172, + average="weighted", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + num_classes=172, + average="macro", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=ARXIV_EVAL_FUNC, + num_classes=172, + average="micro", + ), + ], + ) + + +def gen_arxiv_triggering_config( + config_id: str, gpu_device: str, trigger_config: TriggerConfig, seed: int, start_eval_at: int +) -> ModynPipelineConfig: + return ModynPipelineConfig( + pipeline=Pipeline(name=f"kaggle_arxiv_{config_id}", description="Arxiv triggering config", version="0.0.1"), + model=ModelConfig(id="ArticleNet", config={"num_classes": 172}), + model_storage=PipelineModelStorageConfig(full_model_strategy=FullModelStrategy(name="PyTorchFullModel")), + training=gen_arxiv_training_conf(gpu_device, seed), + selection_strategy=NewDataStrategyConfig( + maximum_keys_in_memory=200000, storage_backend="database", tail_triggers=0, limit=-1 + ), + data=DataConfig( + dataset_id="arxiv_kaggle_train", + bytes_parser_function=ARXIV_BPF, + tokenizer="DistilBertTokenizerTransform", + ), + trigger=trigger_config, + evaluation=EvaluationConfig( + handlers=[ + EvalHandlerConfig( + name="exactmatrix", + execution_time="after_pipeline", + models="matrix", + datasets=["arxiv_kaggle_test"], + strategy=PeriodicEvalStrategyConfig( + every="26w", + interval="[-13w; +13w]", + start_timestamp=start_eval_at + 13 * SECONDS_PER_UNIT["w"], + end_timestamp=1724803200, + ), + ) + ], + after_pipeline_evaluation_workers=2, + after_training_evaluation_workers=2, + device=gpu_device, + datasets=[get_eval_data_config(dataset) for dataset in ["arxiv_kaggle_test"]], + ), + ) diff --git a/benchmark/sigmod/triggering/run_arxiv_triggering.py b/benchmark/sigmod/triggering/run_arxiv_triggering.py new file mode 100644 index 000000000..8f6225cca --- /dev/null +++ b/benchmark/sigmod/triggering/run_arxiv_triggering.py @@ -0,0 +1,271 @@ +from __future__ import annotations + +import logging +import math +import os +import sys +from pathlib import Path + +from benchmark.sigmod.triggering.arxiv_triggering_config import gen_arxiv_triggering_config, get_eval_data_config +from experiments.utils.experiment_runner import run_multiple_pipelines +from modyn.config.schema.pipeline import ModynPipelineConfig +from modyn.config.schema.pipeline.trigger import TriggerConfig +from modyn.config.schema.pipeline.trigger.drift import DataDriftTriggerConfig +from modyn.config.schema.pipeline.trigger.drift.alibi_detect import AlibiDetectMmdDriftMetric +from modyn.config.schema.pipeline.trigger.drift.criterion import ( + DynamicQuantileThresholdCriterion, + DynamicRollingAverageThresholdCriterion, + ThresholdDecisionCriterion, +) +from modyn.config.schema.pipeline.trigger.drift.detection_window.time_ import TimeWindowingStrategy +from modyn.config.schema.pipeline.trigger.performance.criterion import ( + DynamicQuantilePerformanceThresholdCriterion, + DynamicRollingAveragePerformanceThresholdCriterion, + StaticPerformanceThresholdCriterion, +) +from modyn.config.schema.pipeline.trigger.performance.performance import ( + PerformanceTriggerConfig, + PerformanceTriggerEvaluationConfig, +) +from modyn.config.schema.pipeline.trigger.simple.data_amount import DataAmountTriggerConfig +from modyn.config.schema.pipeline.trigger.simple.time import TimeTriggerConfig +from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs +from modyn.utils.utils import current_time_millis +from modynclient.config.schema.client_config import ModynClientConfig, Supervisor + +logging.basicConfig( + level=logging.NOTSET, + format="[%(asctime)s] [%(filename)15s:%(lineno)4d] %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d:%H:%M:%S", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler(f"client_{current_time_millis()}.log", mode="w"), + ], +) +logger = logging.getLogger(__name__) + +START_TIMESTAMP = 631152000 + + +def gen_triggering_strategies(device: str) -> list[tuple[str, TriggerConfig]]: + strategies = [] + + # TimeTriggers + for years in [1, 3, 5]: + strategies.append( + (f"timetrigger_{years}y", TimeTriggerConfig(every=f"{years}y", start_timestamp=START_TIMESTAMP)) + ) + + # DataAmountTriggers + for count in [30000, 50000]: + strategies.append((f"amounttrigger_{count}", DataAmountTriggerConfig(num_samples=count))) + + return strategies + + +def gen_revision_triggering_strategies(device: str) -> list[tuple[str, TriggerConfig]]: + strategies = [] + min_warmup_data_points = 20000 + + for evaluation_interval_data_points in [5000]: + warmup_intervals = math.ceil(min_warmup_data_points / evaluation_interval_data_points) + + # Static Drift Triggers + for threshold in [0.02, 0.01, 0.005, 0.002]: + for window_size in ["1y", "2y"]: + conf = DataDriftTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + sample_size=10000, + metrics={ + "mmd_alibi": AlibiDetectMmdDriftMetric( + device="gpu", + num_permutations=None, + decision_criterion=ThresholdDecisionCriterion(threshold=threshold), + ) + }, + warmup_policy=TimeTriggerConfig(every="1y", start_timestamp=START_TIMESTAMP), + warmup_intervals=warmup_intervals, + ) + name = f"mmdalibi_{evaluation_interval_data_points}_{threshold}_{window_size}" + strategies.append((name, conf)) + + ## Dynamic Drift Triggers + for window_size in ["1y", "2y"]: + for metric_window_size in [15, 30]: # how many drift scores we use for calibrating the dynamic policy + criteria = [] + for deviation in [1]: + criteria.append( + ( + f"roll_{deviation}", + DynamicRollingAverageThresholdCriterion( + window_size=metric_window_size, deviation=deviation, absolute=False + ), + ) + ) + for quantile in [0.05, 0.1, 0.2]: + criteria.append( + ( + f"qt_{quantile}", + DynamicQuantileThresholdCriterion(window_size=metric_window_size, quantile=quantile), + ) + ) + for dec_crit_str, decision_criterion in criteria: + conf = DataDriftTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + sample_size=10000, + metrics={ + "mmd_alibi": AlibiDetectMmdDriftMetric( + device="gpu", + num_permutations=None, + decision_criterion=decision_criterion, + ) + }, + warmup_policy=TimeTriggerConfig(every="1y", start_timestamp=START_TIMESTAMP), + warmup_intervals=warmup_intervals, + ) + + name = f"mmdalibi_dyn_{evaluation_interval_data_points}_{metric_window_size}_{dec_crit_str}_{window_size}" + strategies.append((name, conf)) + + ## Static PerformanceTriggers + for threshold in [0.8, 0.75, 0.7]: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=1, # deprecated + data_density_window_size=1, # deprecated + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("arxiv_kaggle_train") + ), + decision_criteria={ + f"static-{threshold}": StaticPerformanceThresholdCriterion( + metric="Top-2-Accuracy", metric_threshold=threshold + ) + }, + ) + name = f"perf_{evaluation_interval_data_points}_{threshold}" + strategies.append((name, conf)) + + ## Dynamic PerformanceTriggers + for performance_triggers_window_size in [15, 30]: + criteria = [] + for deviation in [0.2]: + criterion = DynamicRollingAveragePerformanceThresholdCriterion( + metric="Top-2-Accuracy", + window_size=performance_triggers_window_size, + deviation=deviation, + absolute=False, + ) + criteria.append((f"{performance_triggers_window_size}_roll_{deviation}", criterion)) + + for quantile in [0.05, 0.1, 0.2]: + criterion = DynamicQuantilePerformanceThresholdCriterion( + metric="Top-2-Accuracy", window_size=performance_triggers_window_size, quantile=quantile + ) + criteria.append((f"{performance_triggers_window_size}_perc_{quantile}", criterion)) + + for dec_crit_str, decision_criterion in criteria: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=performance_triggers_window_size, + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("arxiv_kaggle_train") + ), + decision_criteria={f"dynamic-{dec_crit_str}": decision_criterion}, + warmup_policy=TimeTriggerConfig(every="1y", start_timestamp=START_TIMESTAMP), + warmup_intervals=warmup_intervals, + ) + name = f"perf_dyn_{evaluation_interval_data_points}_{dec_crit_str}" + strategies.append((name, conf)) + + return strategies + + +def run_experiment() -> None: + logger.info("Grüeziwohl!") + pipeline_configs: list[ModynPipelineConfig] = [] + train_gpu = "cuda:0" + num_gpus = 1 # to parallelize across gpus + gpu_id = 0 + seeds = [42, 99, 12] # set to [None] to disable, should be 0-100 + skip_existing = True + + existing_pipelines = [] + if skip_existing: + log_directory = Path(input("Please enter the directory in which to search for existing pipelines: ")) or Path( + "/raid/modyn/maxi/sigmod/logs" + ) + if not log_directory.exists(): + raise RuntimeError(f"{log_directory} does not exist.") + + names = list(log_directory.glob("**/.name")) + + for name_file in names: + name = name_file.read_text() + pipeline_file = name_file.parent / "pipeline.log" + + if not pipeline_file.exists(): + logger.info(f"{name_file} exists, but {pipeline_file} does not") + continue + + try: + parsed_log = PipelineLogs.model_validate_json(pipeline_file.read_text()) + except: + print(f"Skipping file {pipeline_file} due to invalid format") + continue + + seed = parsed_log.config.pipeline.training.seed + existing_pipelines.append((name, seed)) + + logger.info(f"Found these existing pipelines: {existing_pipelines}") + + existing_pipelines = set(existing_pipelines) + run_id = 0 + for seed in seeds: + for triggering_strategy_id, triggering_strategy in gen_triggering_strategies( + train_gpu + ) + gen_revision_triggering_strategies(train_gpu): + if ( + isinstance(triggering_strategy, DataDriftTriggerConfig) + or isinstance(triggering_strategy, PerformanceTriggerConfig) + ) and seed != seeds[0]: + continue # only execute drift triggers once + + pipeline_config = gen_arxiv_triggering_config( + triggering_strategy_id, train_gpu, triggering_strategy, seed, START_TIMESTAMP + ) + + if run_id % num_gpus == gpu_id and (pipeline_config.pipeline.name, seed) not in existing_pipelines: + logger.info(f"Running {triggering_strategy_id} with seed {seed} on this GPU.") + pipeline_configs.append(pipeline_config) + + run_id += 1 + + print(f"Running {len(pipeline_configs)} pipelines in total now.") + host = os.getenv("MODYN_SUPERVISOR_HOST") + port = os.getenv("MODYN_SUPERVISOR_PORT") + + if not host: + host = input("Enter the supervisors host address: ") or "localhost" + if not port: + port = int(input("Enter the supervisors port: ") or "3000") + + run_multiple_pipelines( + client_config=ModynClientConfig(supervisor=Supervisor(ip=host, port=port)), + pipeline_configs=pipeline_configs, + start_replay_at=START_TIMESTAMP, + stop_replay_at=None, + maximum_triggers=None, + show_eval_progress=False, + ) + + +if __name__ == "__main__": + run_experiment() diff --git a/benchmark/sigmod/triggering/run_yb_triggering.py b/benchmark/sigmod/triggering/run_yb_triggering.py index 5dfea3cb5..754ff7700 100644 --- a/benchmark/sigmod/triggering/run_yb_triggering.py +++ b/benchmark/sigmod/triggering/run_yb_triggering.py @@ -1,23 +1,36 @@ from __future__ import annotations import logging +import math import os import sys from pathlib import Path -from benchmark.sigmod.triggering.yearbook_triggering_config import ( - gen_yearbook_triggering_config, -) +from benchmark.sigmod.triggering.yearbook_triggering_config import gen_yearbook_triggering_config, get_eval_data_config from experiments.utils.experiment_runner import run_multiple_pipelines from modyn.config.schema.pipeline import ModynPipelineConfig from modyn.config.schema.pipeline.trigger import TriggerConfig -from modyn.config.schema.pipeline.trigger.data_amount import DataAmountTriggerConfig from modyn.config.schema.pipeline.trigger.drift import DataDriftTriggerConfig from modyn.config.schema.pipeline.trigger.drift.alibi_detect import ( AlibiDetectMmdDriftMetric, ) -from modyn.config.schema.pipeline.trigger.drift.config import TimeWindowingStrategy -from modyn.config.schema.pipeline.trigger.time import TimeTriggerConfig +from modyn.config.schema.pipeline.trigger.drift.criterion import ( + DynamicQuantileThresholdCriterion, + DynamicRollingAverageThresholdCriterion, + ThresholdDecisionCriterion, +) +from modyn.config.schema.pipeline.trigger.drift.detection_window import TimeWindowingStrategy +from modyn.config.schema.pipeline.trigger.performance.criterion import ( + DynamicQuantilePerformanceThresholdCriterion, + DynamicRollingAveragePerformanceThresholdCriterion, + StaticPerformanceThresholdCriterion, +) +from modyn.config.schema.pipeline.trigger.performance.performance import ( + PerformanceTriggerConfig, + PerformanceTriggerEvaluationConfig, +) +from modyn.config.schema.pipeline.trigger.simple.data_amount import DataAmountTriggerConfig +from modyn.config.schema.pipeline.trigger.simple.time import TimeTriggerConfig from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs from modyn.utils.utils import current_time_millis from modynclient.config.schema.client_config import ModynClientConfig, Supervisor @@ -38,28 +51,139 @@ def gen_triggering_strategies() -> list[tuple[str, TriggerConfig]]: strategies = [] # TimeTriggers - for years in [3, 5, 15, 25, 40]: + for years in [1, 3, 5, 15, 25, 40]: strategies.append((f"timetrigger_{years}y", TimeTriggerConfig(every=f"{years}d"))) # DataAmountTriggers for count in [500, 1000, 2000, 10000]: strategies.append((f"amounttrigger_{count}", DataAmountTriggerConfig(num_samples=count))) - # DriftTriggers + return strategies + + +def gen_revision_triggering_strategies(device: str) -> list[tuple[str, TriggerConfig]]: + strategies = [] + min_warmup_data_points = 3500 + for evaluation_interval_data_points in [250, 500, 100]: - for threshold in [0.05, 0.07, 0.09]: - for window_size in ["1d", "2d", "5d"]: # fake timestamps, hence days + warmup_intervals = math.ceil(min_warmup_data_points / evaluation_interval_data_points) + + ## Drift Triggers + for window_size in ["1d", "2d", "5d"]: # fake timestamps, hence days + ## Static Drift + for threshold in [0.05, 0.07, 0.09]: conf = DataDriftTriggerConfig( evaluation_interval_data_points=evaluation_interval_data_points, - windowing_strategy=TimeWindowingStrategy(limit=window_size), - reset_current_window_on_trigger=False, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + sample_size=None, metrics={ - "mmd_alibi": AlibiDetectMmdDriftMetric(device="cpu", num_permutations=None, threshold=threshold) + "mmd_alibi": AlibiDetectMmdDriftMetric( + device="gpu", + num_permutations=None, + decision_criterion=ThresholdDecisionCriterion(threshold=threshold), + ) }, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, ) name = f"mmdalibi_{evaluation_interval_data_points}_{threshold}_{window_size}" strategies.append((name, conf)) + ## Dynamic Drift + for metric_window_size in [15, 30]: # how many drift scores we use for calibrating the policy + criteria = [] + for deviation in [0.05, 1, 2]: + if evaluation_interval_data_points == 100: + continue # No rolling average for very small windows + criteria.append( + ( + f"roll_{deviation}", + DynamicRollingAverageThresholdCriterion( + window_size=metric_window_size, deviation=deviation, absolute=False + ), + ) + ) + for quantile in [0.05, 0.1, 0.2, 0.3]: + criteria.append( + ( + f"qt_{quantile}", + DynamicQuantileThresholdCriterion(window_size=metric_window_size, quantile=quantile), + ) + ) + + for dec_crit_str, decision_criterion in criteria: + conf = DataDriftTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + windowing_strategy=TimeWindowingStrategy( + allow_overlap=True, limit_ref=window_size, limit_cur=window_size + ), + metrics={ + "mmd_alibi": AlibiDetectMmdDriftMetric( + device=device, + num_permutations=None, + decision_criterion=decision_criterion, + ) + }, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, + ) + + name = f"mmdalibi_dyn_{evaluation_interval_data_points}_{metric_window_size}_{dec_crit_str}_{window_size}" + strategies.append((name, conf)) + + ## Static PerformanceTriggers + for threshold in [0.95, 0.9, 0.875, 0.85, 0.825, 0.8, 0.7]: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=1, # somewhat deprecated parameter, not relevant for static + data_density_window_size=1, # also ignored + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("yearbook_train") + ), + decision_criteria={ + f"static-{threshold}": StaticPerformanceThresholdCriterion( + metric="Accuracy", metric_threshold=threshold + ) + }, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, + ) + name = f"perf_{evaluation_interval_data_points}_{threshold}" + strategies.append((name, conf)) + + ## Dynamic Performance Triggers + for performance_triggers_window_size in [15, 30]: + criteria = [] + for deviation in [0.1, 0.2, 0.3]: + criterion = DynamicRollingAveragePerformanceThresholdCriterion( + metric="Accuracy", window_size=performance_triggers_window_size, deviation=deviation, absolute=False + ) + criteria.append((f"{performance_triggers_window_size}_roll_{deviation}", criterion)) + + for quantile in [0.05, 0.1, 0.2, 0.3]: + criterion = DynamicQuantilePerformanceThresholdCriterion( + metric="Accuracy", window_size=performance_triggers_window_size, quantile=quantile + ) + criteria.append((f"{performance_triggers_window_size}_qt_{quantile}", criterion)) + + for dec_crit_str, decision_criterion in criteria: + conf = PerformanceTriggerConfig( + evaluation_interval_data_points=evaluation_interval_data_points, + performance_triggers_window_size=performance_triggers_window_size, + mode="hindsight", + evaluation=PerformanceTriggerEvaluationConfig( + device=device, dataset=get_eval_data_config("yearbook_train") + ), + decision_criteria={f"dynamic-{dec_crit_str}": decision_criterion}, + warmup_policy=TimeTriggerConfig(every="3d"), + warmup_intervals=warmup_intervals, + ) + name = f"perf_dyn_{evaluation_interval_data_points}_{dec_crit_str}" + strategies.append((name, conf)) + return strategies @@ -104,8 +228,14 @@ def run_experiment() -> None: existing_pipelines = set(existing_pipelines) run_id = 0 for seed in seeds: - for triggering_strategy_id, triggering_strategy in gen_triggering_strategies(): - if isinstance(triggering_strategy, DataDriftTriggerConfig) and seed != seeds[0]: + for ( + triggering_strategy_id, + triggering_strategy, + ) in gen_triggering_strategies() + gen_revision_triggering_strategies(train_gpu): + if ( + isinstance(triggering_strategy, DataDriftTriggerConfig) + or isinstance(triggering_strategy, PerformanceTriggerConfig) + ) and seed != seeds[0]: continue # only execute drift triggers once pipeline_config = gen_yearbook_triggering_config( @@ -118,6 +248,7 @@ def run_experiment() -> None: run_id += 1 + print(f"Running {len(pipeline_configs)} pipelines in total now.") host = os.getenv("MODYN_SUPERVISOR_HOST") port = os.getenv("MODYN_SUPERVISOR_PORT") diff --git a/benchmark/sigmod/triggering/yearbook_triggering_config.py b/benchmark/sigmod/triggering/yearbook_triggering_config.py index b1b283b38..da7f188d7 100644 --- a/benchmark/sigmod/triggering/yearbook_triggering_config.py +++ b/benchmark/sigmod/triggering/yearbook_triggering_config.py @@ -19,6 +19,61 @@ from modyn.config.schema.pipeline.sampling.config import NewDataStrategyConfig from modyn.config.schema.pipeline.trigger import TriggerConfig +YEARBOOK_BYTES_PARSER_FUNC = ( + "import warnings\n" + "import torch\n" + "def bytes_parser_function(data: memoryview) -> torch.Tensor:\n" + " with warnings.catch_warnings():\n" + " warnings.simplefilter('ignore', category=UserWarning)\n" + " return torch.frombuffer(data, dtype=torch.float32).reshape(3, 32, 32)" +) + + +def get_eval_data_config(dataset: str) -> EvalDataConfig: + return EvalDataConfig( + dataset_id=dataset, + bytes_parser_function=YEARBOOK_BYTES_PARSER_FUNC, + batch_size=512, + dataloader_workers=1, + metrics=[ + AccuracyMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + topn=1, + ), + F1ScoreMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + num_classes=2, + average="weighted", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + num_classes=2, + average="macro", + ), + F1ScoreMetricConfig( + evaluation_transformer_function=( + "import torch\n" + "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" + " return torch.argmax(model_output, dim=-1)" + ), + num_classes=2, + average="micro", + ), + ], + ) + def gen_yearbook_triggering_config( config_id: str, @@ -26,14 +81,6 @@ def gen_yearbook_triggering_config( trigger_config: TriggerConfig, seed: int, ) -> ModynPipelineConfig: - bytes_parser_func = ( - "import warnings\n" - "import torch\n" - "def bytes_parser_function(data: memoryview) -> torch.Tensor:\n" - " with warnings.catch_warnings():\n" - " warnings.simplefilter('ignore', category=UserWarning)\n" - " return torch.frombuffer(data, dtype=torch.float32).reshape(3, 32, 32)" - ) return ModynPipelineConfig( pipeline=Pipeline(name=f"yearbook_{config_id}", description="Yearbook triggering config", version="0.0.1"), model=ModelConfig(id="YearbookNet", config={"num_input_channels": 3, "num_classes": 2}), @@ -63,7 +110,7 @@ def gen_yearbook_triggering_config( selection_strategy=NewDataStrategyConfig( maximum_keys_in_memory=100000, storage_backend="database", tail_triggers=0, limit=-1 ), - data=DataConfig(dataset_id="yearbook_train", bytes_parser_function=bytes_parser_func), + data=DataConfig(dataset_id="yearbook_train", bytes_parser_function=YEARBOOK_BYTES_PARSER_FUNC), trigger=trigger_config, evaluation=EvaluationConfig( handlers=[ @@ -80,51 +127,6 @@ def gen_yearbook_triggering_config( after_pipeline_evaluation_workers=12, after_training_evaluation_workers=12, device=gpu_device, - datasets=[ - EvalDataConfig( - dataset_id=dataset, - bytes_parser_function=bytes_parser_func, - batch_size=512, - dataloader_workers=1, - metrics=[ - AccuracyMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - topn=1, - ), - F1ScoreMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - num_classes=2, - average="weighted", - ), - F1ScoreMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - num_classes=2, - average="macro", - ), - F1ScoreMetricConfig( - evaluation_transformer_function=( - "import torch\n" - "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n" - " return torch.argmax(model_output, dim=-1)" - ), - num_classes=2, - average="micro", - ), - ], - ) - for dataset in ["yearbook_train", "yearbook_test"] - ], + datasets=[get_eval_data_config(dataset) for dataset in ["yearbook_train", "yearbook_test"]], ), ) diff --git a/modyn/config/schema/pipeline/evaluation/metrics.py b/modyn/config/schema/pipeline/evaluation/metrics.py index ce88e5dc3..6924fb756 100644 --- a/modyn/config/schema/pipeline/evaluation/metrics.py +++ b/modyn/config/schema/pipeline/evaluation/metrics.py @@ -36,6 +36,10 @@ def evaluation_transformer_function_deserialized(self) -> Callable | None: def shape_check(self) -> bool: return True + @property + def full_name(self) -> str: + return self.name + class AccuracyMetricConfig(_BaseMetricConfig): name: Literal["Accuracy"] = Field("Accuracy") @@ -45,6 +49,10 @@ class AccuracyMetricConfig(_BaseMetricConfig): def shape_check(self) -> bool: return self.topn <= 1 + @property + def full_name(self) -> str: + return "Accuracy" if self.topn == 1 else f"Top-{self.topn}-Accuracy" + F1ScoreTypes = Literal["macro", "micro", "weighted", "binary"] diff --git a/modyn/config/schema/pipeline/trigger/performance/performance.py b/modyn/config/schema/pipeline/trigger/performance/performance.py index 8c5ff62e0..28e9c2a46 100644 --- a/modyn/config/schema/pipeline/trigger/performance/performance.py +++ b/modyn/config/schema/pipeline/trigger/performance/performance.py @@ -84,7 +84,7 @@ class PerformanceTriggerConfig(_InternalPerformanceTriggerConfig): def validate_decision_criteria(self) -> "PerformanceTriggerConfig": """Assert that all criteria use metrics that are defined in the evaluation config.""" - metrics = {metric.name for metric in self.evaluation.dataset.metrics} + metrics = {metric.full_name for metric in self.evaluation.dataset.metrics} for criterion in self.decision_criteria.values(): if isinstance(criterion, StaticNumberAvoidableMisclassificationCriterion): continue diff --git a/modyn/metadata_database/metadata_database_connection.py b/modyn/metadata_database/metadata_database_connection.py index 428fe83b0..016259612 100644 --- a/modyn/metadata_database/metadata_database_connection.py +++ b/modyn/metadata_database/metadata_database_connection.py @@ -37,7 +37,7 @@ def __init__(self, modyn_config: dict) -> None: if "hash_partition_modulus" in self.modyn_config["metadata_database"] else 16 ) - self.seed: int = ( + self.seed: int | None = ( self.modyn_config["metadata_database"]["seed"] if "seed" in self.modyn_config["metadata_database"] else None ) if self.seed is not None: