diff --git a/docker-compose.yml b/docker-compose.yml index 052870785..0007bae98 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -127,6 +127,8 @@ services: # volumes: # - .:/modyn_host container_name: supervisor + ports: + - 3000:50062 tests: depends_on: storage: diff --git a/environment.yml b/environment.yml index e3e31a4c9..4d5f24566 100644 --- a/environment.yml +++ b/environment.yml @@ -2,7 +2,7 @@ # This contains all dependencies to run modyn and modyn itself, but not the dependencies to run the tests/development # By default, we disable CUDA to reduce the size of installed packages. # When deploying on a real system, please uncomment the according lines. -# Furthermore, we need to separate the dependencies for the individual components, since not all components require all dependencies (e.g., only the trainer server needs PyTorch). This is issue #104. +# Furthermore, we need to separate the dependencies for the individual components, since not all components require all dependencies (e.g., only the trainer server and evaluator need PyTorch). This is issue #104. name: modyn @@ -26,6 +26,7 @@ dependencies: - pyaml - numpy - pandas + - tensorboard - pyftpdlib - types-protobuf - types-psycopg2 diff --git a/modyn/config/examples/example-pipeline.yaml b/modyn/config/examples/example-pipeline.yaml index 6054a8190..10e194c86 100644 --- a/modyn/config/examples/example-pipeline.yaml +++ b/modyn/config/examples/example-pipeline.yaml @@ -64,6 +64,7 @@ trigger: evaluation: device: "cpu" amp: False + result_writers: ["json", "tensorboard"] datasets: - dataset_id: mnist transformations: ["transforms.ToTensor()", diff --git a/modyn/config/examples/modyn_config.yaml b/modyn/config/examples/modyn_config.yaml index 406a529da..b2e80a1f1 100644 --- a/modyn/config/examples/modyn_config.yaml +++ b/modyn/config/examples/modyn_config.yaml @@ -177,3 +177,6 @@ model_storage: evaluator: hostname: "evaluator" port: "50061" + +tensorboard: + port: "50062" \ No newline at end of file diff --git a/modyn/config/schema/modyn_config_schema.yaml b/modyn/config/schema/modyn_config_schema.yaml index a2d40657b..a5e9d4497 100644 --- a/modyn/config/schema/modyn_config_schema.yaml +++ b/modyn/config/schema/modyn_config_schema.yaml @@ -351,6 +351,15 @@ properties: - port - ftp_port - offline_dataset_directory + tensorboard: + type: object + properties: + port: + type: string + description: | + The port on which tensorboard is run. + required: + - port required: - project - storage diff --git a/modyn/config/schema/pipeline-schema.yaml b/modyn/config/schema/pipeline-schema.yaml index 4a8f624db..7984fb27e 100644 --- a/modyn/config/schema/pipeline-schema.yaml +++ b/modyn/config/schema/pipeline-schema.yaml @@ -361,6 +361,15 @@ properties: type: boolean description: | If True, automatic mixed precision will be used. + result_writers: + type: array + description: | + Specifies in which formats to store the evaluation results. We currently support json and tensorboard. + minItems: 1 + items: + type: string + description: | + Name of the evaluation result writer. datasets: type: array description: | @@ -429,8 +438,9 @@ properties: - batch_size - metrics required: - - datasets - device + - result_writers + - datasets required: - pipeline - model diff --git a/modyn/supervisor/internal/evaluation_result_writer/__init__.py b/modyn/supervisor/internal/evaluation_result_writer/__init__.py new file mode 100644 index 000000000..e574925a4 --- /dev/null +++ b/modyn/supervisor/internal/evaluation_result_writer/__init__.py @@ -0,0 +1,12 @@ +"""Supervisor module. The supervisor initiates a pipeline and coordinates all components. + +""" +import os + +from .abstract_evaluation_result_writer import AbstractEvaluationResultWriter # noqa: F401 +from .json_result_writer import JsonResultWriter # noqa: F401 +from .tensorboard_result_writer import TensorboardResultWriter # noqa: F401 + +files = os.listdir(os.path.dirname(__file__)) +files.remove("__init__.py") +__all__ = [f[:-3] for f in files if f.endswith(".py")] diff --git a/modyn/supervisor/internal/evaluation_result_writer/abstract_evaluation_result_writer.py b/modyn/supervisor/internal/evaluation_result_writer/abstract_evaluation_result_writer.py new file mode 100644 index 000000000..81272a964 --- /dev/null +++ b/modyn/supervisor/internal/evaluation_result_writer/abstract_evaluation_result_writer.py @@ -0,0 +1,35 @@ +import pathlib +from abc import ABC, abstractmethod + +# pylint: disable=no-name-in-module +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData + + +class AbstractEvaluationResultWriter(ABC): + """ + Abstract class used to write evaluation results to the evaluation directory + """ + + def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path): + self.pipeline_id = pipeline_id + self.trigger_id = trigger_id + self.eval_directory = eval_directory + + @abstractmethod + def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None: + """ + Called whenever a metric results are available for a particular dataset. + + Args: + dataset_id: the involved dataset. + dataset_size: the size (amount of samples) of the dataset. + evaluation_data: contains the metric results. + """ + raise NotImplementedError() + + @abstractmethod + def store_results(self) -> None: + """ + Called in the end to store the results. + """ + raise NotImplementedError() diff --git a/modyn/supervisor/internal/evaluation_result_writer/json_result_writer.py b/modyn/supervisor/internal/evaluation_result_writer/json_result_writer.py new file mode 100644 index 000000000..ea16c4261 --- /dev/null +++ b/modyn/supervisor/internal/evaluation_result_writer/json_result_writer.py @@ -0,0 +1,23 @@ +import json +import pathlib + +# pylint: disable=no-name-in-module +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData +from modyn.supervisor.internal.evaluation_result_writer import AbstractEvaluationResultWriter + + +class JsonResultWriter(AbstractEvaluationResultWriter): + def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path): + super().__init__(pipeline_id, trigger_id, eval_directory) + self.results: dict = {"datasets": []} + + def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None: + dataset_results: dict = {"dataset_size": dataset_size, "metrics": []} + for metric in evaluation_data: + dataset_results["metrics"].append({"name": metric.metric, "result": metric.result}) + self.results["datasets"].append({dataset_id: dataset_results}) + + def store_results(self) -> None: + file_name = f"{self.pipeline_id}_{self.trigger_id}.eval" + with open(self.eval_directory / file_name, "w+", encoding="utf-8") as output_file: + json.dump(self.results, output_file) diff --git a/modyn/supervisor/internal/evaluation_result_writer/tensorboard_result_writer.py b/modyn/supervisor/internal/evaluation_result_writer/tensorboard_result_writer.py new file mode 100644 index 000000000..7baef8c00 --- /dev/null +++ b/modyn/supervisor/internal/evaluation_result_writer/tensorboard_result_writer.py @@ -0,0 +1,23 @@ +import pathlib + +# pylint: disable=no-name-in-module +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData +from modyn.supervisor.internal.evaluation_result_writer.abstract_evaluation_result_writer import ( + AbstractEvaluationResultWriter, +) +from torch.utils.tensorboard import SummaryWriter + + +class TensorboardResultWriter(AbstractEvaluationResultWriter): + def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path): + super().__init__(pipeline_id, trigger_id, eval_directory) + self.writer = SummaryWriter(log_dir=str(eval_directory)) + + def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None: + for metric in evaluation_data: + self.writer.add_scalar( + f"pipeline_{self.pipeline_id}/{dataset_id}/{metric.metric}", metric.result, self.trigger_id + ) + + def store_results(self) -> None: + self.writer.flush() diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index 0eac4daee..bb129ebb6 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -1,7 +1,6 @@ # pylint: disable=no-name-in-module import json import logging -import pathlib from collections import deque from time import sleep from typing import Iterable, Optional @@ -45,6 +44,7 @@ GetNewDataSinceResponse, ) from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub +from modyn.supervisor.internal.evaluation_result_writer import AbstractEvaluationResultWriter from modyn.supervisor.internal.utils import EvaluationStatusTracker, TrainingStatusTracker from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import CheckpointInfo, Data from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import JsonString as TrainerServerJsonString @@ -618,16 +618,12 @@ def wait_for_evaluation_completion(self, training_id: int, evaluations: dict[int def store_evaluation_results( self, - eval_directory: pathlib.Path, - pipeline_id: int, - trigger_id: int, + evaluation_result_writers: list[AbstractEvaluationResultWriter], evaluations: dict[int, EvaluationStatusTracker], ) -> None: if not self.connected_to_evaluator: raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.") - # TODO(#281): store results in a framework-specific format - results: dict = {"datasets": []} for evaluation_id in evaluations: req = EvaluationResultRequest(evaluation_id=evaluation_id) res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) @@ -635,12 +631,11 @@ def store_evaluation_results( if not res.valid: logger.warning(f"Cannot get the evaluation result for evaluation {evaluation_id}") continue + dataset_id = evaluations[evaluation_id].dataset_id + dataset_size = evaluations[evaluation_id].dataset_size - dataset_results: dict = {"dataset_size": evaluations[evaluation_id].dataset_size, "metrics": []} - for metric in res.evaluation_data: - dataset_results["metrics"].append({"name": metric.metric, "result": metric.result}) - results["datasets"].append({evaluations[evaluation_id].dataset_id: dataset_results}) + for result_writer in evaluation_result_writers: + result_writer.add_evaluation_data(dataset_id, dataset_size, res.evaluation_data) - file_name = f"{pipeline_id}_{trigger_id}.eval" - with open(eval_directory / file_name, "w+", encoding="utf-8") as output_file: - json.dump(results, output_file) + for result_writer in evaluation_result_writers: + result_writer.store_results() diff --git a/modyn/supervisor/supervisor.py b/modyn/supervisor/supervisor.py index ed411e47f..10cf47fce 100644 --- a/modyn/supervisor/supervisor.py +++ b/modyn/supervisor/supervisor.py @@ -5,9 +5,15 @@ from typing import Optional import enlighten +from modyn.supervisor.internal.evaluation_result_writer import ( + AbstractEvaluationResultWriter, + JsonResultWriter, + TensorboardResultWriter, +) from modyn.supervisor.internal.grpc_handler import GRPCHandler from modyn.supervisor.internal.triggers import Trigger from modyn.utils import dynamic_module_import, is_directory_writable, model_available, trigger_available, validate_yaml +from tensorboard import program logger = logging.getLogger(__name__) @@ -23,6 +29,8 @@ class Supervisor: "CoresetStrategy", ] + supported_evaluation_result_writers: dict = {"json": JsonResultWriter, "tensorboard": TensorboardResultWriter} + def __init__( self, pipeline_config: dict, @@ -76,6 +84,11 @@ def __init__( if "seed" in pipeline_config["training"]: self.grpc.seed_selector(pipeline_config["training"]["seed"]) + if "tensorboard" in self.modyn_config: + port = self.modyn_config["tensorboard"]["port"] + self._run_tensorboard(port) + logger.info(f"Starting up tensorboard on port {port}.") + def _setup_trigger(self) -> None: trigger_id = self.pipeline_config["trigger"]["id"] trigger_config = {} @@ -102,8 +115,7 @@ def validate_pipeline_config_schema(self) -> bool: return True - @staticmethod - def _validate_evaluation_options(evaluation_config: dict) -> bool: + def _validate_evaluation_options(self, evaluation_config: dict) -> bool: is_valid = True dataset_ids = [dataset["dataset_id"] for dataset in evaluation_config["datasets"]] @@ -111,6 +123,12 @@ def _validate_evaluation_options(evaluation_config: dict) -> bool: logger.error("Dataset ids must be unique in evaluation") is_valid = False + if "result_writers" in evaluation_config: + writer_names = set(evaluation_config["result_writers"]) + if diff := writer_names.difference(self.supported_evaluation_result_writers.keys()): + logger.error(f"Found invalid evaluation result writers: {', '.join(diff)}.") + is_valid = False + for dataset in evaluation_config["datasets"]: batch_size = dataset["batch_size"] if batch_size < 1: @@ -222,6 +240,26 @@ def get_dataset_selector_batch_size(self) -> None: def validate_system(self) -> bool: return self.dataset_available() and self.grpc.trainer_server_available() + def _run_tensorboard(self, port: str) -> None: + logging.getLogger("tensorboard").setLevel(logging.ERROR) + logging.getLogger("MARKDOWN").setLevel(logging.ERROR) + + tensorboard = program.TensorBoard() + tensorboard.configure( + argv=[ + None, + "--logdir", + str(self.eval_directory), + "--bind_all", + "--port", + port, + "--window_title", + "Modyn TensorBoard", + ] + ) + tensorboard.launch() + logging.getLogger("werkzeug").setLevel(logging.ERROR) + def shutdown_trainer(self) -> None: if self.current_training_id is not None: self.grpc.stop_training_at_trainer_server(self.current_training_id) @@ -359,7 +397,13 @@ def _run_training(self, trigger_id: int) -> None: if "evaluation" in self.pipeline_config: evaluations = self.grpc.start_evaluation(trained_model_id, self.pipeline_config) self.grpc.wait_for_evaluation_completion(self.current_training_id, evaluations) - self.grpc.store_evaluation_results(self.eval_directory, self.pipeline_id, trigger_id, evaluations) + + writer_names: set[str] = set(self.pipeline_config["evaluation"]["result_writers"]) + writers = [self._init_evaluation_writer(name, trigger_id) for name in writer_names] + self.grpc.store_evaluation_results(writers, evaluations) + + def _init_evaluation_writer(self, name: str, trigger_id: int) -> AbstractEvaluationResultWriter: + return self.supported_evaluation_result_writers[name](self.pipeline_id, trigger_id, self.eval_directory) def initial_pass(self) -> None: # TODO(#128): Implement initial pass. diff --git a/modyn/tests/supervisor/internal/evaluation_result_writer/test_abstract_evaluation_result_writer.py b/modyn/tests/supervisor/internal/evaluation_result_writer/test_abstract_evaluation_result_writer.py new file mode 100644 index 000000000..f0d87843d --- /dev/null +++ b/modyn/tests/supervisor/internal/evaluation_result_writer/test_abstract_evaluation_result_writer.py @@ -0,0 +1,10 @@ +import pathlib + +from modyn.supervisor.internal.evaluation_result_writer import JsonResultWriter + + +def test_init(): + writer = JsonResultWriter(10, 15, pathlib.Path("")) + assert writer.pipeline_id == 10 + assert writer.trigger_id == 15 + assert str(writer.eval_directory) == "." diff --git a/modyn/tests/supervisor/internal/evaluation_result_writer/test_json_result_writer.py b/modyn/tests/supervisor/internal/evaluation_result_writer/test_json_result_writer.py new file mode 100644 index 000000000..b493f2868 --- /dev/null +++ b/modyn/tests/supervisor/internal/evaluation_result_writer/test_json_result_writer.py @@ -0,0 +1,38 @@ +import json +import pathlib +import tempfile + +# pylint: disable=no-name-in-module +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData +from modyn.supervisor.internal.evaluation_result_writer import JsonResultWriter + + +def test_json_writer(): + with tempfile.TemporaryDirectory() as path: + eval_dir = pathlib.Path(path) + writer = JsonResultWriter(10, 15, eval_dir) + writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)]) + writer.store_results() + + file_path = eval_dir / f"{10}_{15}.eval" + assert file_path.exists() and file_path.is_file() + + with open(file_path, "r", encoding="utf-8") as eval_file: + evaluation_results = json.load(eval_file) + assert evaluation_results == json.loads( + """{ + "datasets": [ + { + "mnist": { + "dataset_size": 1000, + "metrics": [ + { + "name": "Accuracy", + "result": 0.5 + } + ] + } + } + ] + }""" + ) diff --git a/modyn/tests/supervisor/internal/evaluation_result_writer/test_tensorboard_result_writer.py b/modyn/tests/supervisor/internal/evaluation_result_writer/test_tensorboard_result_writer.py new file mode 100644 index 000000000..98d4c2c7d --- /dev/null +++ b/modyn/tests/supervisor/internal/evaluation_result_writer/test_tensorboard_result_writer.py @@ -0,0 +1,29 @@ +import os +import pathlib +import tempfile +from unittest.mock import patch + +# pylint: disable=no-name-in-module +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData +from modyn.supervisor.internal.evaluation_result_writer import TensorboardResultWriter + + +def test_tensorboard_writer(): + with tempfile.TemporaryDirectory() as path: + eval_dir = pathlib.Path(path) + writer = TensorboardResultWriter(10, 15, eval_dir) + writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)]) + writer.store_results() + + assert len(os.listdir(eval_dir)) == 1 + + with tempfile.TemporaryDirectory() as path: + eval_dir = pathlib.Path(path) + result_writer = TensorboardResultWriter(10, 15, eval_dir) + + with patch.object(result_writer.writer, "add_scalar") as add_method: + result_writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)]) + + assert add_method.call_args[0][0] == "pipeline_10/mnist/Accuracy" + assert add_method.call_args[0][1] == 0.5 + assert add_method.call_args[0][2] == 15 diff --git a/modyn/tests/supervisor/internal/test_grpc_handler.py b/modyn/tests/supervisor/internal/test_grpc_handler.py index 2468629cc..8eb829c7a 100644 --- a/modyn/tests/supervisor/internal/test_grpc_handler.py +++ b/modyn/tests/supervisor/internal/test_grpc_handler.py @@ -33,6 +33,7 @@ GetNewDataSinceResponse, ) from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub +from modyn.supervisor.internal.evaluation_result_writer import JsonResultWriter from modyn.supervisor.internal.grpc_handler import GRPCHandler from modyn.supervisor.internal.utils import EvaluationStatusTracker from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import ( @@ -585,7 +586,7 @@ def test_store_evaluation_results(test_connection_established): with tempfile.TemporaryDirectory() as path: with patch.object(handler.evaluator, "get_evaluation_result", return_value=res) as get_method: eval_dir = pathlib.Path(path) - handler.store_evaluation_results(eval_dir, 5, 3, evaluations) + handler.store_evaluation_results([JsonResultWriter(5, 3, eval_dir)], evaluations) assert get_method.call_count == 2 called_ids = [call[0][0].evaluation_id for call in get_method.call_args_list] @@ -652,7 +653,7 @@ def test_store_evaluation_results_invalid(test_connection_established): with patch.object(handler.evaluator, "get_evaluation_result", return_value=res) as get_method: eval_dir = pathlib.Path(path) - handler.store_evaluation_results(eval_dir, 5, 3, evaluations) + handler.store_evaluation_results([JsonResultWriter(5, 3, eval_dir)], evaluations) get_method.assert_called_with(EvaluationResultRequest(evaluation_id=10)) file_path = eval_dir / f"{5}_{3}.eval" diff --git a/modyn/tests/supervisor/test_supervisor.py b/modyn/tests/supervisor/test_supervisor.py index fc9b544dc..54ca03310 100644 --- a/modyn/tests/supervisor/test_supervisor.py +++ b/modyn/tests/supervisor/test_supervisor.py @@ -7,6 +7,7 @@ import pytest from modyn.supervisor import Supervisor +from modyn.supervisor.internal.evaluation_result_writer import AbstractEvaluationResultWriter from modyn.supervisor.internal.grpc_handler import GRPCHandler from modyn.supervisor.internal.utils.evaluation_status_tracker import EvaluationStatusTracker @@ -215,21 +216,29 @@ def test__validate_training_options(): assert sup._validate_training_options() +@patch.object(Supervisor, "__init__", noop_constructor_mock) def test__validate_evaluation_options(): + sup = Supervisor(get_minimal_pipeline_config(), get_minimal_system_config(), EVALUATION_DIRECTORY, None) + # Check that evaluation with identical dataset_ids gets rejected evaluation_config = get_minimal_evaluation_config() evaluation_config["datasets"].append({"dataset_id": "MNIST_eval", "batch_size": 3, "dataloader_workers": 3}) - assert not Supervisor._validate_evaluation_options(evaluation_config) + assert not sup._validate_evaluation_options(evaluation_config) # Check that evaluation with an invalid batch size gets rejected evaluation_config = get_minimal_evaluation_config() evaluation_config["datasets"][0]["batch_size"] = -1 - assert not Supervisor._validate_evaluation_options(evaluation_config) + assert not sup._validate_evaluation_options(evaluation_config) # Check that evaluation with an invalid dataloader amount gets rejected evaluation_config = get_minimal_evaluation_config() evaluation_config["datasets"][0]["dataloader_workers"] = -1 - assert not Supervisor._validate_evaluation_options(evaluation_config) + assert not sup._validate_evaluation_options(evaluation_config) + + # Check that evaluation with invalid evaluation writer gets rejected + evaluation_config = get_minimal_evaluation_config() + evaluation_config["result_writers"] = ["json", "unknown", "unknown2"] + assert not sup._validate_evaluation_options(evaluation_config) @patch.object(Supervisor, "__init__", noop_constructor_mock) @@ -591,6 +600,7 @@ def test__run_training_with_evaluation( sup = get_non_connecting_supervisor() # pylint: disable=no-value-for-parameter evaluation_pipeline_config = get_minimal_pipeline_config() evaluation_pipeline_config["evaluation"] = get_minimal_evaluation_config() + evaluation_pipeline_config["evaluation"]["result_writers"] = ["json"] sup.pipeline_config = evaluation_pipeline_config assert sup.validate_pipeline_config_schema() @@ -606,7 +616,12 @@ def test__run_training_with_evaluation( test_start_evaluation.assert_called_once_with(101, evaluation_pipeline_config) test_wait_for_evaluation_completion.assert_called_once_with(1337, evaluations) - test_store_evaluation_results.assert_called_once_with(EVALUATION_DIRECTORY, 42, 21, evaluations) + test_store_evaluation_results.assert_called_once() + assert len(test_store_evaluation_results.call_args[0][0]) == 1 + result_writer: AbstractEvaluationResultWriter = test_store_evaluation_results.call_args[0][0][0] + assert result_writer.eval_directory == EVALUATION_DIRECTORY + assert result_writer.pipeline_id == 42 + assert result_writer.trigger_id == 21 @patch.object(Supervisor, "__init__", noop_constructor_mock)