From bec88be55113ff55381f98f5759675d0b26c256a Mon Sep 17 00:00:00 2001 From: Robin Holzinger Date: Sun, 22 Sep 2024 04:01:27 +0200 Subject: [PATCH] More plots --- .../yb_traintime_dataamount.ipynb | 244 +++++++++ .../rh_thesis/yb_scatter_triggering.ipynb | 495 ++++++++++++++++++ .../internal/pipeline_executor/models.py | 79 ++- 3 files changed, 806 insertions(+), 12 deletions(-) create mode 100644 analytics/plotting/rh_thesis/traintime_vs_dataamount/yb_traintime_dataamount.ipynb create mode 100644 analytics/plotting/rh_thesis/yb_scatter_triggering.ipynb diff --git a/analytics/plotting/rh_thesis/traintime_vs_dataamount/yb_traintime_dataamount.ipynb b/analytics/plotting/rh_thesis/traintime_vs_dataamount/yb_traintime_dataamount.ipynb new file mode 100644 index 000000000..36f5a67c4 --- /dev/null +++ b/analytics/plotting/rh_thesis/traintime_vs_dataamount/yb_traintime_dataamount.ipynb @@ -0,0 +1,244 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "\n", + "from analytics.app.data.load import list_pipelines\n", + "from analytics.plotting.common.common import init_plot\n", + "from analytics.plotting.common.font import setup_font\n", + "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", + "from modyn.supervisor.internal.pipeline_executor.models import StageLog\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# INPUTS\n", + "\n", + "pipelines_dir = Path(\n", + " \"/Users/robinholzinger/robin/dev/eth/modyn-robinholzi-data/data/triggering/arxiv/11_baselines_amount\"\n", + ")\n", + "# pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-robinholzi-data/data/triggering/huffpost/11_baselines_amount\")\n", + "# pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-robinholzi-data/data/triggering/yearbook/11_baselines_amount\")\n", + "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "assert pipelines_dir.exists()\n", + "assert output_dir.exists()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipelines = list_pipelines(pipelines_dir)\n", + "max_pipeline_id = max(pipelines.keys())\n", + "pipelines" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from analytics.app.data.load import load_pipeline_logs\n", + "\n", + "pipeline_logs = {p_id: load_pipeline_logs(p_id, pipelines_dir) for (p_id, (_, p_path)) in pipelines.items()}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Wrangle data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "list_df_train: list[pd.DataFrame] = []\n", + "\n", + "for pipeline_id in pipelines:\n", + " logs = pipeline_logs[pipeline_id]\n", + " train_logs = [record for record in logs.supervisor_logs.stage_runs if record.id == PipelineStage.TRAIN.name]\n", + " df_train = StageLog.df(stage_logs=train_logs, extended=True)\n", + " df_train[\"pipeline_id\"] = pipelines[pipeline_id][0]\n", + " list_df_train.append(df_train)\n", + "\n", + "df_train = pd.concat(list_df_train)\n", + "df_train.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Conversion" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Clean pipeline name\n", + "\n", + "import re\n", + "\n", + "\n", + "def pipeline_name_cleaner(name: str):\n", + " return re.sub(r\".*_dataamount_(\\d+)\", \"trigger every \\\\1 samples\", name)\n", + "\n", + "\n", + "df_train[\"pipeline_id\"] = df_train[\"pipeline_id\"].apply(pipeline_name_cleaner)\n", + "df_train.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# to seconds\n", + "df_train[\"duration\"] = df_train[\"duration\"].dt.total_seconds() / 60\n", + "# df_train[\"duration\"] = df_train[\"duration\"].dt.total_seconds()\n", + "# df_train[\"train_time_at_trainer\"] = df_train[\"train_time_at_trainer\"] / 1_000 # millis to seconds\n", + "df_train[\"train_time_at_trainer\"] = df_train[\"train_time_at_trainer\"] / 1_000 / 60 # millis to minutes\n", + "df_train" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Sort by number of samples\n", + "df_train = df_train.sort_values(by=\"num_samples\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create Plot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from analytics.plotting.common.color import discrete_colors, main_color\n", + "\n", + "sns.set_style(\"whitegrid\")\n", + "\n", + "init_plot()\n", + "setup_font(small_label=True, small_title=True)\n", + "\n", + "\n", + "FONTSIZE = 20\n", + "DOUBLE_FIG_WIDTH = 10\n", + "DOUBLE_FIG_HEIGHT = 3.5\n", + "DOUBLE_FIG_SIZE = (DOUBLE_FIG_WIDTH, 1.5 * DOUBLE_FIG_HEIGHT)\n", + "\n", + "width_factor = 0.5\n", + "height_factor = 0.5\n", + "\n", + "fig = plt.figure(\n", + " edgecolor=\"black\",\n", + " frameon=True,\n", + " figsize=(\n", + " DOUBLE_FIG_WIDTH * width_factor,\n", + " 2 * DOUBLE_FIG_HEIGHT * height_factor,\n", + " ),\n", + " dpi=300,\n", + ")\n", + "\n", + "ax1 = sns.regplot(\n", + " df_train,\n", + " x=\"num_samples\",\n", + " y=\"train_time_at_trainer\", # duration\n", + " color=main_color(0),\n", + ")\n", + "\n", + "ax2 = sns.scatterplot(\n", + " df_train,\n", + " x=\"num_samples\",\n", + " y=\"train_time_at_trainer\", # duration\n", + " hue=\"pipeline_id\",\n", + " palette=(\n", + " discrete_colors(14)[0:5] + discrete_colors(14)[9:14]\n", + " if \"yearbook\" in str(pipelines_dir)\n", + " else (\n", + " discrete_colors(8)[0:3] + discrete_colors(8)[6:8]\n", + " if \"huffpost\" in str(pipelines_dir)\n", + " else discrete_colors(8)[0:3] + discrete_colors(8)[6:8]\n", + " )\n", + " ),\n", + " s=200,\n", + " legend=True,\n", + " marker=\"X\",\n", + ")\n", + "\n", + "# Display the plot\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# TODO: run more variants of in less dense areas\n", + "# TODO: plot / add number of datapoints to thesis so that the signicance of regression line is clear\n", + "# State in thesis that there are no outliers to be expected!" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/analytics/plotting/rh_thesis/yb_scatter_triggering.ipynb b/analytics/plotting/rh_thesis/yb_scatter_triggering.ipynb new file mode 100644 index 000000000..192516a2f --- /dev/null +++ b/analytics/plotting/rh_thesis/yb_scatter_triggering.ipynb @@ -0,0 +1,495 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "\n", + "from analytics.app.data.load import list_pipelines\n", + "from analytics.app.data.transform import (\n", + " df_aggregate_eval_metric,\n", + " dfs_models_and_evals,\n", + " logs_dataframe,\n", + " patch_yearbook_time,\n", + ")\n", + "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# INPUTS\n", + "\n", + "pipelines_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-sigmod-data/yearbook/triggering/logs_agg\")\n", + "output_dir = Path(\"/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/_plots\")\n", + "assert pipelines_dir.exists()\n", + "assert output_dir.exists()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipelines = list_pipelines(pipelines_dir)\n", + "max_pipeline_id = max(pipelines.keys())\n", + "pipelines" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from analytics.app.data.load import load_pipeline_logs\n", + "\n", + "pipeline_logs = {p_id: load_pipeline_logs(p_id, pipelines_dir) for (p_id, (_, p_path)) in pipelines.items()}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "type(pipeline_logs[32])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# mode:\n", + "# single pipeline\n", + "pipeline_ids = (\n", + " [\n", + " # yearly triggers\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if \"timetrigger\" in p and (\"_1y\" in p or \"_3y\" in p or \"_5y\" in p)\n", + " ]\n", + " + [\n", + " # amount 500, 2000\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if \"amount\" in p and (\"500\" in p or \"2000\" in p)\n", + " ]\n", + " + [\n", + " # drift\n", + " p_id\n", + " for p_id, (p, _) in pipelines.items()\n", + " if p\n", + " in {\n", + " \"yearbook_mmdalibi_250_0.05_5d\",\n", + " \"yearbook_mmdalibi_250_0.07_1d\",\n", + " \"yearbook_mmdalibi_250_0.07_5d\",\n", + " \"yearbook_mmdalibi_250_0.05_1d\",\n", + " \"yearbook_mmdalibi_500_0.05_1d\",\n", + " \"yearbook_mmdalibi_100_0.05_1d\",\n", + " }\n", + " ]\n", + ")\n", + "composite_model_variant = \"currently_active_model\" # currently_trained_model\n", + "patch_yearbook = True\n", + "dataset_id = \"yearbook_test\"\n", + "eval_handler = \"slidingmatrix\"\n", + "metric = \"Accuracy\"\n", + "include_composite_model = False\n", + "\n", + "\n", + "def pipeline_name_mapper(name: str) -> str:\n", + " name = name.replace(\"yearbook_\", \"\")\n", + " name = name.replace(\"timetrigger_\", \"\") # \"every \"\n", + " name = name.replace(\"amounttrigger_\", \"\") # \"every \"\n", + " name = name.replace(\"mmdalibi_\", \"\")\n", + " if name.endswith(\"y\"):\n", + " name = name[:-1] + (\" years\" if not name.endswith(\"1y\") else \" year\")\n", + " elif not name.endswith(\"d\"): # dataamount\n", + " name = name + \" samples\"\n", + " else: # drift\n", + " name = name.replace(\"_\", \"/\")\n", + " return name\n", + "\n", + "\n", + "pipelines = {p_id: (pipeline_name_mapper(pname), p_path) for p_id, (pname, p_path) in pipelines.items()}\n", + "\n", + "[(p_id, pname) for p_id, (pname, _) in pipelines.items() if p_id in pipeline_ids]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Wrangle data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "list_df_eval_single: list[pd.DataFrame] = []\n", + "list_df_all: list[pd.DataFrame] = []\n", + "\n", + "for pipeline_id in pipeline_ids:\n", + " df_all = logs_dataframe(pipeline_logs[pipeline_id], pipelines[pipeline_id][0])\n", + " list_df_all.append(df_all)\n", + "\n", + " _, _, df_eval_single = dfs_models_and_evals(\n", + " pipeline_logs[pipeline_id], df_all[\"sample_time\"].max(), pipelines[pipeline_id][0]\n", + " )\n", + " list_df_eval_single.append(df_eval_single)\n", + "\n", + "df_adjusted = pd.concat(list_df_eval_single)\n", + "df_adjusted\n", + "\n", + "df_all = pd.concat(list_df_all)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted = df_adjusted[\n", + " (df_adjusted[\"dataset_id\"] == dataset_id)\n", + " & (df_adjusted[\"eval_handler\"] == eval_handler)\n", + " & (df_adjusted[\"metric\"] == metric)\n", + "]\n", + "\n", + "# in percent (0-100)\n", + "df_adjusted[\"value\"] = df_adjusted[\"value\"] * 100\n", + "df_adjusted" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if patch_yearbook:\n", + " for column in [\"interval_start\", \"interval_center\", \"interval_end\"]:\n", + " patch_yearbook_time(df_adjusted, column)\n", + " patch_yearbook_time(df_all, \"sample_time\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted = df_adjusted.sort_values(by=[\"interval_center\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Reduce to composite models\n", + "df_adjusted = df_adjusted[df_adjusted[composite_model_variant]]\n", + "df_adjusted[composite_model_variant].unique()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dump Data backup" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create Plot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# reduce evaluation interval to interval where all policies have evaluations\n", + "min_active_eval_center_per_pipeline = (\n", + " df_adjusted[df_adjusted[composite_model_variant]].groupby(\"pipeline_ref\")[\"interval_center\"].min()\n", + ")\n", + "maximum_min = min_active_eval_center_per_pipeline.max()\n", + "print(maximum_min, min_active_eval_center_per_pipeline)\n", + "\n", + "df_adjusted = df_adjusted[df_adjusted[\"interval_center\"] >= maximum_min]\n", + "df_adjusted[\"interval_center\"].unique()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_adjusted[\"interval_center\"] = df_adjusted[\"interval_center\"].astype(str).str.split(\"-\").str[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Aggregate metrics to a scalar value per pipeline\n", + "mean_accuracies = df_aggregate_eval_metric(\n", + " df_adjusted,\n", + " group_by=[\"pipeline_ref\", \"metric\"],\n", + " in_col=\"value\",\n", + " out_col=\"metric_value\",\n", + " aggregate_func=\"mean\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_triggers = df_all[df_all[\"id\"] == PipelineStage.HANDLE_SINGLE_TRIGGER.name]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_triggers = df_all[df_all[\"id\"] == PipelineStage.HANDLE_SINGLE_TRIGGER.name]\n", + "df_triggers = df_triggers[df_triggers[\"sample_time\"] > maximum_min]\n", + "df_triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Find number of trigger per pipeline that are after maximum_min\n", + "\n", + "# before the cutoff there was one trigger (equivalent to start of our reduced dataset): +1\n", + "num_triggers = df_triggers.groupby(\"pipeline_ref\").aggregate(count=(\"id\", \"count\")) + 1\n", + "num_triggers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "merged = num_triggers.merge(mean_accuracies, on=\"pipeline_ref\")\n", + "assert mean_accuracies.shape[0] == merged.shape[0]\n", + "merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_type(x: str):\n", + " if \"year\" in x:\n", + " return \"yearly\"\n", + " elif \"samples\" in x:\n", + " return \"amount\"\n", + " elif \"d\" in x:\n", + " return \"drift\"\n", + " else:\n", + " return \"unknown\"\n", + "\n", + "\n", + "merged[\"type\"] = merged[\"pipeline_ref\"].apply(lambda x: create_type(x))\n", + "merged" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette = sns.color_palette(\"RdBu\", 10)\n", + "palette" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "palette2 = sns.color_palette(\"colorblind\", 10)\n", + "palette2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the heatmap\n", + "from collections import defaultdict\n", + "\n", + "from analytics.plotting.common.common import INIT_PLOT\n", + "\n", + "INIT_PLOT()\n", + "# sns.set_theme(style=\"ticks\")\n", + "# plt.rcParams['svg.fonttype'] = 'none'\n", + "sns.set_style(\"whitegrid\")\n", + "\n", + "FONTSIZE = 20\n", + "DOUBLE_FIG_WIDTH = 10\n", + "DOUBLE_FIG_HEIGHT = 3.5\n", + "DOUBLE_FIG_SIZE = (DOUBLE_FIG_WIDTH, 1.5 * DOUBLE_FIG_HEIGHT)\n", + "\n", + "fig = plt.figure(\n", + " edgecolor=\"black\",\n", + " frameon=True,\n", + " figsize=DOUBLE_FIG_SIZE,\n", + " dpi=300,\n", + ")\n", + "\n", + "ax = sns.scatterplot(\n", + " merged,\n", + " x=\"count\",\n", + " y=\"metric_value\",\n", + " hue=\"type\",\n", + " palette={\"drift\": palette[-2], \"yearly\": palette2[1], \"amount\": palette[1]},\n", + " s=200,\n", + " legend=False,\n", + " marker=\"X\",\n", + " # annotations\n", + ")\n", + "ax.set(ylim=(85, 94.5))\n", + "ax.set(xlim=(-4, 85))\n", + "\n", + "for i in range(merged.shape[0]):\n", + " offsets = defaultdict(lambda: (+1.5, -0.25))\n", + " offsets.update(\n", + " {\n", + " # x, y\n", + " \"3 years\": (-3, +0.5),\n", + " \"1 year\": (-2, -0.85),\n", + " \"5 years\": (-3, +0.5),\n", + " \"500 samples\": (-5, +0.5),\n", + " \"2000 samples\": (+1.7, -0.25),\n", + " \"250/0.05/5d\": (-2, +0.5),\n", + " \"100/0.05/1d\": (+1.5, -0.7),\n", + " \"500/0.05/1d\": (+1.5, 0.15),\n", + " \"250/0.07/1d\": (+1.5, -0.55),\n", + " \"250/0.05/1d\": (-10, +0.4),\n", + " }\n", + " )\n", + " plt.rc(\"text\", usetex=True)\n", + " plt.text(\n", + " x=merged[\"count\"][i] + offsets[merged[\"pipeline_ref\"][i]][0],\n", + " y=merged[\"metric_value\"][i] + offsets[merged[\"pipeline_ref\"][i]][1],\n", + " s=r\"\\textbf{\" + merged[\"pipeline_ref\"][i] + \"}\",\n", + " fontdict=dict(color=\"black\", fontsize=17),\n", + " )\n", + " plt.rc(\"text\", usetex=False)\n", + "\n", + "\n", + "# Adjust x-axis tick labels\n", + "plt.xlabel(\"Number of triggers\", labelpad=10)\n", + "plt.xticks(\n", + " ticks=[x for x in range(0, 80 + 1, 20)],\n", + " labels=[x for x in range(0, 80 + 1, 20)],\n", + " rotation=0,\n", + " # ha='right'\n", + ")\n", + "\n", + "# Set y-axis ticks to be equally spaced\n", + "plt.ylabel(\"Mean Accuracy %\", labelpad=15)\n", + "plt.yticks(\n", + " ticks=[x for x in range(86, 95 + 1, 3)],\n", + " labels=[x for x in range(86, 95 + 1, 3)],\n", + " rotation=0,\n", + ")\n", + "\n", + "# Display the plot\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Save Plot as svg" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for img_type in [\"png\", \"svg\"]:\n", + " img_path = output_dir / f\"scatter_yb.{img_type}\"\n", + " fig.savefig(img_path, bbox_inches=\"tight\", transparent=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/modyn/supervisor/internal/pipeline_executor/models.py b/modyn/supervisor/internal/pipeline_executor/models.py index 10330f2ec..3cc19db0f 100644 --- a/modyn/supervisor/internal/pipeline_executor/models.py +++ b/modyn/supervisor/internal/pipeline_executor/models.py @@ -21,7 +21,9 @@ from modyn.supervisor.internal.eval.handler import EvalRequest from modyn.supervisor.internal.grpc.enums import PipelineStage from modyn.supervisor.internal.triggers.utils.models import TriggerPolicyEvaluationLog -from modyn.supervisor.internal.utils.evaluation_status_reporter import EvaluationStatusReporter +from modyn.supervisor.internal.utils.evaluation_status_reporter import ( + EvaluationStatusReporter, +) from modyn.supervisor.internal.utils.git_utils import get_head_sha logger = logging.getLogger(__name__) @@ -250,7 +252,12 @@ def df_columns(self) -> list[str]: @override @property def df_row(self) -> tuple: - return (self.trigger_i, self.trigger_index, self.trigger_i, self.num_samples_in_trigger) + return ( + self.trigger_i, + self.trigger_index, + self.trigger_i, + self.num_samples_in_trigger, + ) class TriggerExecutionInfo(_TriggerLogMixin): @@ -260,12 +267,24 @@ class TriggerExecutionInfo(_TriggerLogMixin): def df_columns(self) -> list[str]: """Provide the column names of the DataFrame representation of the data.""" - return ["trigger_i", "trigger_index", "trigger_id", "first_timestamp", "last_timestamp"] + return [ + "trigger_i", + "trigger_index", + "trigger_id", + "first_timestamp", + "last_timestamp", + ] @override @property def df_row(self) -> tuple: - return (self.trigger_i, self.trigger_index, self.trigger_id, self.first_timestamp, self.last_timestamp) + return ( + self.trigger_i, + self.trigger_index, + self.trigger_id, + self.first_timestamp, + self.last_timestamp, + ) class _TrainInfoMixin(StageInfo): @@ -279,12 +298,24 @@ class TrainingInfo(_TrainInfoMixin): def df_columns(self) -> list[str]: """Provide the column names of the DataFrame representation of the data.""" - return ["trigger_id", "training_id", "num_batches", "num_samples"] + return [ + "trigger_id", + "training_id", + "num_batches", + "num_samples", + "train_time_at_trainer", + ] @override @property def df_row(self) -> tuple: - return (self.trigger_id, self.training_id, self.trainer_log["num_batches"], self.trainer_log["num_samples"]) + return ( + self.trigger_id, + self.training_id, + self.trainer_log["num_batches"], + self.trainer_log["num_samples"], + self.trainer_log["total_train"], + ) class StoreModelInfo(_TrainInfoMixin): @@ -510,9 +541,16 @@ class StageLog(BaseModel): def df_columns(self, extended: bool = False) -> list[str]: """Provide the column names of the DataFrame representation of the data.""" - return ["id", "start", "end", "duration", "batch_idx", "sample_idx", "sample_time", "trigger_idx"] + ( - self.info.df_columns() if extended and self.info else [] - ) + return [ + "id", + "start", + "end", + "duration", + "batch_idx", + "sample_idx", + "sample_time", + "trigger_idx", + ] + (self.info.df_columns() if extended and self.info else []) def df_row(self, extended: bool = False) -> tuple: return ( @@ -605,7 +643,16 @@ def df(self) -> pd.DataFrame: ) for stage in self.stage_runs ], - columns=["id", "start", "end", "duration", "batch_idx", "sample_idx", "sample_time", "trigger_idx"], + columns=[ + "id", + "start", + "end", + "duration", + "batch_idx", + "sample_idx", + "sample_time", + "trigger_idx", + ], ) @@ -644,7 +691,11 @@ class PipelineLogs(BaseModel): # metadata partial_idx: int = Field(0) - def materialize(self, log_dir_path: Path, mode: Literal["initial", "increment", "final"] = "increment") -> None: + def materialize( + self, + log_dir_path: Path, + mode: Literal["initial", "increment", "final"] = "increment", + ) -> None: """Materialize the logs to log files. If run with pytest, log_file_path and mode will be ignored. @@ -682,7 +733,11 @@ def materialize(self, log_dir_path: Path, mode: Literal["initial", "increment", return if mode == "increment": - with open(pipeline_logdir / f"supervisor_part_{self.partial_idx}.log", "w", encoding="utf-8") as logfile: + with open( + pipeline_logdir / f"supervisor_part_{self.partial_idx}.log", + "w", + encoding="utf-8", + ) as logfile: logfile.write(self.supervisor_logs.model_dump_json(by_alias=True, indent=2)) self.supervisor_logs.clear()