Skip to content

Commit

Permalink
Add Tensorboard Support (#292)
Browse files Browse the repository at this point in the history
Implement basic functionality to provide the user with evaluation
metrics visualization as described in #281. Tensorboard should run on
the supervisor and visualize evaluation metrics over time.
  • Loading branch information
robin-oester authored Aug 19, 2023
1 parent 93c3b55 commit 013e7af
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 24 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ services:
# volumes:
# - .:/modyn_host
container_name: supervisor
ports:
- 3000:50062
tests:
depends_on:
storage:
Expand Down
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This contains all dependencies to run modyn and modyn itself, but not the dependencies to run the tests/development
# By default, we disable CUDA to reduce the size of installed packages.
# When deploying on a real system, please uncomment the according lines.
# Furthermore, we need to separate the dependencies for the individual components, since not all components require all dependencies (e.g., only the trainer server needs PyTorch). This is issue #104.
# Furthermore, we need to separate the dependencies for the individual components, since not all components require all dependencies (e.g., only the trainer server and evaluator need PyTorch). This is issue #104.

name: modyn

Expand All @@ -26,6 +26,7 @@ dependencies:
- pyaml
- numpy
- pandas
- tensorboard
- pyftpdlib
- types-protobuf
- types-psycopg2
Expand Down
1 change: 1 addition & 0 deletions modyn/config/examples/example-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ trigger:
evaluation:
device: "cpu"
amp: False
result_writers: ["json", "tensorboard"]
datasets:
- dataset_id: mnist
transformations: ["transforms.ToTensor()",
Expand Down
3 changes: 3 additions & 0 deletions modyn/config/examples/modyn_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,6 @@ model_storage:
evaluator:
hostname: "evaluator"
port: "50061"

tensorboard:
port: "50062"
9 changes: 9 additions & 0 deletions modyn/config/schema/modyn_config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ properties:
- port
- ftp_port
- offline_dataset_directory
tensorboard:
type: object
properties:
port:
type: string
description: |
The port on which tensorboard is run.
required:
- port
required:
- project
- storage
Expand Down
12 changes: 11 additions & 1 deletion modyn/config/schema/pipeline-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,15 @@ properties:
type: boolean
description: |
If True, automatic mixed precision will be used.
result_writers:
type: array
description: |
Specifies in which formats to store the evaluation results. We currently support json and tensorboard.
minItems: 1
items:
type: string
description: |
Name of the evaluation result writer.
datasets:
type: array
description: |
Expand Down Expand Up @@ -429,8 +438,9 @@ properties:
- batch_size
- metrics
required:
- datasets
- device
- result_writers
- datasets
required:
- pipeline
- model
Expand Down
12 changes: 12 additions & 0 deletions modyn/supervisor/internal/evaluation_result_writer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Supervisor module. The supervisor initiates a pipeline and coordinates all components.
"""
import os

from .abstract_evaluation_result_writer import AbstractEvaluationResultWriter # noqa: F401
from .json_result_writer import JsonResultWriter # noqa: F401
from .tensorboard_result_writer import TensorboardResultWriter # noqa: F401

files = os.listdir(os.path.dirname(__file__))
files.remove("__init__.py")
__all__ = [f[:-3] for f in files if f.endswith(".py")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pathlib
from abc import ABC, abstractmethod

# pylint: disable=no-name-in-module
from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData


class AbstractEvaluationResultWriter(ABC):
"""
Abstract class used to write evaluation results to the evaluation directory
"""

def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path):
self.pipeline_id = pipeline_id
self.trigger_id = trigger_id
self.eval_directory = eval_directory

@abstractmethod
def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None:
"""
Called whenever a metric results are available for a particular dataset.
Args:
dataset_id: the involved dataset.
dataset_size: the size (amount of samples) of the dataset.
evaluation_data: contains the metric results.
"""
raise NotImplementedError()

@abstractmethod
def store_results(self) -> None:
"""
Called in the end to store the results.
"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json
import pathlib

# pylint: disable=no-name-in-module
from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData
from modyn.supervisor.internal.evaluation_result_writer import AbstractEvaluationResultWriter


class JsonResultWriter(AbstractEvaluationResultWriter):
def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path):
super().__init__(pipeline_id, trigger_id, eval_directory)
self.results: dict = {"datasets": []}

def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None:
dataset_results: dict = {"dataset_size": dataset_size, "metrics": []}
for metric in evaluation_data:
dataset_results["metrics"].append({"name": metric.metric, "result": metric.result})
self.results["datasets"].append({dataset_id: dataset_results})

def store_results(self) -> None:
file_name = f"{self.pipeline_id}_{self.trigger_id}.eval"
with open(self.eval_directory / file_name, "w+", encoding="utf-8") as output_file:
json.dump(self.results, output_file)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pathlib

# pylint: disable=no-name-in-module
from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData
from modyn.supervisor.internal.evaluation_result_writer.abstract_evaluation_result_writer import (
AbstractEvaluationResultWriter,
)
from torch.utils.tensorboard import SummaryWriter


class TensorboardResultWriter(AbstractEvaluationResultWriter):
def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path):
super().__init__(pipeline_id, trigger_id, eval_directory)
self.writer = SummaryWriter(log_dir=str(eval_directory))

def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None:
for metric in evaluation_data:
self.writer.add_scalar(
f"pipeline_{self.pipeline_id}/{dataset_id}/{metric.metric}", metric.result, self.trigger_id
)

def store_results(self) -> None:
self.writer.flush()
21 changes: 8 additions & 13 deletions modyn/supervisor/internal/grpc_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# pylint: disable=no-name-in-module
import json
import logging
import pathlib
from collections import deque
from time import sleep
from typing import Iterable, Optional
Expand Down Expand Up @@ -45,6 +44,7 @@
GetNewDataSinceResponse,
)
from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub
from modyn.supervisor.internal.evaluation_result_writer import AbstractEvaluationResultWriter
from modyn.supervisor.internal.utils import EvaluationStatusTracker, TrainingStatusTracker
from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import CheckpointInfo, Data
from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import JsonString as TrainerServerJsonString
Expand Down Expand Up @@ -618,29 +618,24 @@ def wait_for_evaluation_completion(self, training_id: int, evaluations: dict[int

def store_evaluation_results(
self,
eval_directory: pathlib.Path,
pipeline_id: int,
trigger_id: int,
evaluation_result_writers: list[AbstractEvaluationResultWriter],
evaluations: dict[int, EvaluationStatusTracker],
) -> None:
if not self.connected_to_evaluator:
raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.")

# TODO(#281): store results in a framework-specific format
results: dict = {"datasets": []}
for evaluation_id in evaluations:
req = EvaluationResultRequest(evaluation_id=evaluation_id)
res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req)

if not res.valid:
logger.warning(f"Cannot get the evaluation result for evaluation {evaluation_id}")
continue
dataset_id = evaluations[evaluation_id].dataset_id
dataset_size = evaluations[evaluation_id].dataset_size

dataset_results: dict = {"dataset_size": evaluations[evaluation_id].dataset_size, "metrics": []}
for metric in res.evaluation_data:
dataset_results["metrics"].append({"name": metric.metric, "result": metric.result})
results["datasets"].append({evaluations[evaluation_id].dataset_id: dataset_results})
for result_writer in evaluation_result_writers:
result_writer.add_evaluation_data(dataset_id, dataset_size, res.evaluation_data)

file_name = f"{pipeline_id}_{trigger_id}.eval"
with open(eval_directory / file_name, "w+", encoding="utf-8") as output_file:
json.dump(results, output_file)
for result_writer in evaluation_result_writers:
result_writer.store_results()
50 changes: 47 additions & 3 deletions modyn/supervisor/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
from typing import Optional

import enlighten
from modyn.supervisor.internal.evaluation_result_writer import (
AbstractEvaluationResultWriter,
JsonResultWriter,
TensorboardResultWriter,
)
from modyn.supervisor.internal.grpc_handler import GRPCHandler
from modyn.supervisor.internal.triggers import Trigger
from modyn.utils import dynamic_module_import, is_directory_writable, model_available, trigger_available, validate_yaml
from tensorboard import program

logger = logging.getLogger(__name__)

Expand All @@ -23,6 +29,8 @@ class Supervisor:
"CoresetStrategy",
]

supported_evaluation_result_writers: dict = {"json": JsonResultWriter, "tensorboard": TensorboardResultWriter}

def __init__(
self,
pipeline_config: dict,
Expand Down Expand Up @@ -76,6 +84,11 @@ def __init__(
if "seed" in pipeline_config["training"]:
self.grpc.seed_selector(pipeline_config["training"]["seed"])

if "tensorboard" in self.modyn_config:
port = self.modyn_config["tensorboard"]["port"]
self._run_tensorboard(port)
logger.info(f"Starting up tensorboard on port {port}.")

def _setup_trigger(self) -> None:
trigger_id = self.pipeline_config["trigger"]["id"]
trigger_config = {}
Expand All @@ -102,15 +115,20 @@ def validate_pipeline_config_schema(self) -> bool:

return True

@staticmethod
def _validate_evaluation_options(evaluation_config: dict) -> bool:
def _validate_evaluation_options(self, evaluation_config: dict) -> bool:
is_valid = True

dataset_ids = [dataset["dataset_id"] for dataset in evaluation_config["datasets"]]
if len(set(dataset_ids)) < len(dataset_ids):
logger.error("Dataset ids must be unique in evaluation")
is_valid = False

if "result_writers" in evaluation_config:
writer_names = set(evaluation_config["result_writers"])
if diff := writer_names.difference(self.supported_evaluation_result_writers.keys()):
logger.error(f"Found invalid evaluation result writers: {', '.join(diff)}.")
is_valid = False

for dataset in evaluation_config["datasets"]:
batch_size = dataset["batch_size"]
if batch_size < 1:
Expand Down Expand Up @@ -222,6 +240,26 @@ def get_dataset_selector_batch_size(self) -> None:
def validate_system(self) -> bool:
return self.dataset_available() and self.grpc.trainer_server_available()

def _run_tensorboard(self, port: str) -> None:
logging.getLogger("tensorboard").setLevel(logging.ERROR)
logging.getLogger("MARKDOWN").setLevel(logging.ERROR)

tensorboard = program.TensorBoard()
tensorboard.configure(
argv=[
None,
"--logdir",
str(self.eval_directory),
"--bind_all",
"--port",
port,
"--window_title",
"Modyn TensorBoard",
]
)
tensorboard.launch()
logging.getLogger("werkzeug").setLevel(logging.ERROR)

def shutdown_trainer(self) -> None:
if self.current_training_id is not None:
self.grpc.stop_training_at_trainer_server(self.current_training_id)
Expand Down Expand Up @@ -359,7 +397,13 @@ def _run_training(self, trigger_id: int) -> None:
if "evaluation" in self.pipeline_config:
evaluations = self.grpc.start_evaluation(trained_model_id, self.pipeline_config)
self.grpc.wait_for_evaluation_completion(self.current_training_id, evaluations)
self.grpc.store_evaluation_results(self.eval_directory, self.pipeline_id, trigger_id, evaluations)

writer_names: set[str] = set(self.pipeline_config["evaluation"]["result_writers"])
writers = [self._init_evaluation_writer(name, trigger_id) for name in writer_names]
self.grpc.store_evaluation_results(writers, evaluations)

def _init_evaluation_writer(self, name: str, trigger_id: int) -> AbstractEvaluationResultWriter:
return self.supported_evaluation_result_writers[name](self.pipeline_id, trigger_id, self.eval_directory)

def initial_pass(self) -> None:
# TODO(#128): Implement initial pass.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pathlib

from modyn.supervisor.internal.evaluation_result_writer import JsonResultWriter


def test_init():
writer = JsonResultWriter(10, 15, pathlib.Path(""))
assert writer.pipeline_id == 10
assert writer.trigger_id == 15
assert str(writer.eval_directory) == "."
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json
import pathlib
import tempfile

# pylint: disable=no-name-in-module
from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData
from modyn.supervisor.internal.evaluation_result_writer import JsonResultWriter


def test_json_writer():
with tempfile.TemporaryDirectory() as path:
eval_dir = pathlib.Path(path)
writer = JsonResultWriter(10, 15, eval_dir)
writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)])
writer.store_results()

file_path = eval_dir / f"{10}_{15}.eval"
assert file_path.exists() and file_path.is_file()

with open(file_path, "r", encoding="utf-8") as eval_file:
evaluation_results = json.load(eval_file)
assert evaluation_results == json.loads(
"""{
"datasets": [
{
"mnist": {
"dataset_size": 1000,
"metrics": [
{
"name": "Accuracy",
"result": 0.5
}
]
}
}
]
}"""
)
Loading

0 comments on commit 013e7af

Please sign in to comment.