diff --git a/modyn/selector/internal/grpc/selector_server.py b/modyn/selector/internal/grpc/selector_server.py index 3ca0fc4d9..55f51d043 100644 --- a/modyn/selector/internal/grpc/selector_server.py +++ b/modyn/selector/internal/grpc/selector_server.py @@ -1,4 +1,10 @@ +import contextlib +import datetime import logging +import multiprocessing as mp +import os +import socket +import time from concurrent import futures import grpc @@ -10,32 +16,76 @@ logger = logging.getLogger(__name__) +@contextlib.contextmanager +def _reserve_port(port: str): + """Find and reserve a port for all subprocesses to use.""" + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(("", int(port))) + try: + assert sock.getsockname()[1] == int(port) + yield port + finally: + sock.close() + + +def _wait_forever(server): + try: + while True: + time.sleep(datetime.timedelta(days=1).total_seconds()) + except KeyboardInterrupt: + server.stop(None) + + +def _run_server(bind_address, selector_manager, sample_batch_size): + """Start a server in a subprocess.""" + logging.info(f"[{os.getpid()}] Starting new server.") + + server = grpc.server( + futures.ThreadPoolExecutor( + max_workers=16, + ), + options=[ + ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), + ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), + ("grpc.so_reuseport", 1), + ], + ) + add_SelectorServicer_to_server(SelectorGRPCServicer(selector_manager, sample_batch_size), server) + server.add_insecure_port(bind_address) + server.start() + _wait_forever(server) + + class SelectorServer: def __init__(self, modyn_config: dict) -> None: self.modyn_config = modyn_config self.selector_manager = SelectorManager(modyn_config) - self.grpc_servicer = SelectorGRPCServicer( - self.selector_manager, self.modyn_config["selector"]["sample_batch_size"] - ) - self._add_servicer_to_server_func = add_SelectorServicer_to_server - - def prepare_server(self) -> grpc.server: - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=64), - options=[ - ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), - ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), - ], - ) - self._add_servicer_to_server_func(self.grpc_servicer, server) - return server + self.sample_batch_size = self.modyn_config["selector"]["sample_batch_size"] + self.workers = [] def run(self) -> None: - server = self.prepare_server() - logger.info(f"Starting server. Listening on port {self.modyn_config['selector']['port']}.") - server.add_insecure_port("[::]:" + self.modyn_config["selector"]["port"]) - server.start() - server.wait_for_termination() + port = self.modyn_config["selector"]["port"] + logger.info(f"Starting server. Listening on port {port}") + with _reserve_port(port) as port: + bind_address = "[::]:" + port + for _ in range(64): + worker = mp.Process( + target=_run_server, + args=( + bind_address, + self.selector_manager, + self.sample_batch_size + ), + ) + worker.start() + self.workers.append(worker) + + for worker in self.workers: + worker.join() + if ( "cleanup_trigger_samples_after_shutdown" in self.modyn_config["selector"] and self.modyn_config["selector"]["cleanup_trigger_samples_after_shutdown"] diff --git a/modyn/selector/internal/selector_manager.py b/modyn/selector/internal/selector_manager.py index 51fa6bf89..d81f5d589 100644 --- a/modyn/selector/internal/selector_manager.py +++ b/modyn/selector/internal/selector_manager.py @@ -5,7 +5,7 @@ import os import shutil from pathlib import Path -from threading import Lock +from multiprocessing import Manager, Lock from modyn.metadata_database.metadata_database_connection import MetadataDatabaseConnection from modyn.selector.internal.selector_strategies.abstract_selection_strategy import AbstractSelectionStrategy @@ -18,9 +18,10 @@ class SelectorManager: def __init__(self, modyn_config: dict) -> None: self._modyn_config = modyn_config - self._selectors: dict[int, Selector] = {} - self._selector_locks: dict[int, Lock] = {} - self._next_pipeline_lock = Lock() + self._manager = Manager() + self._selectors: dict[int, Selector] = self._manager.dict() + self._selector_locks: dict[int, Lock] = self._manager.dict() + self._next_pipeline_lock = self._manager.Lock() self._selector_cache_size = self._modyn_config["selector"]["keys_in_selector_cache"] self.init_metadata_db() @@ -75,7 +76,7 @@ def register_pipeline(self, num_workers: int, selection_strategy: str) -> int: selection_strategy = self._instantiate_strategy(json.loads(selection_strategy), pipeline_id) selector = Selector(selection_strategy, pipeline_id, num_workers, self._selector_cache_size) self._selectors[pipeline_id] = selector - self._selector_locks[pipeline_id] = Lock() + self._selector_locks[pipeline_id] = self._manager.Lock() return pipeline_id def get_sample_keys_and_weights( diff --git a/modyn/storage/internal/grpc/grpc_server.py b/modyn/storage/internal/grpc/grpc_server.py index 0a76d6652..7f14520a3 100644 --- a/modyn/storage/internal/grpc/grpc_server.py +++ b/modyn/storage/internal/grpc/grpc_server.py @@ -1,7 +1,14 @@ """GRPC server context manager.""" +import contextlib +import datetime import logging +import multiprocessing as mp +import os +import socket +import time from concurrent import futures +from typing import Any import grpc from modyn.storage.internal.grpc.generated.storage_pb2_grpc import add_StorageServicer_to_server @@ -11,6 +18,49 @@ logger = logging.getLogger(__name__) +@contextlib.contextmanager +def _reserve_port(port: str): + """Find and reserve a port for all subprocesses to use.""" + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(("", int(port))) + try: + assert sock.getsockname()[1] == int(port) + yield port + finally: + sock.close() + + +def _wait_forever(server): + try: + while True: + time.sleep(datetime.timedelta(days=1).total_seconds()) + except KeyboardInterrupt: + server.stop(None) + + +def _run_server(bind_address, modyn_config): + """Start a server in a subprocess.""" + logging.info(f"[{os.getpid()}] Starting new server.") + + server = grpc.server( + futures.ThreadPoolExecutor( + max_workers=16, + ), + options=[ + ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), + ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), + ("grpc.so_reuseport", 1), + ], + ) + add_StorageServicer_to_server(StorageGRPCServicer(modyn_config), server) + server.add_insecure_port(bind_address) + server.start() + _wait_forever(server) + + class GRPCServer: """GRPC server context manager.""" @@ -21,28 +71,34 @@ def __init__(self, modyn_config: dict) -> None: modyn_config (dict): Configuration of the storage module. """ self.modyn_config = modyn_config - self.server = grpc.server( - futures.ThreadPoolExecutor( - max_workers=64, - ), - options=[ - ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), - ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), - ], - ) - - def __enter__(self) -> grpc.Server: + self.workers = [] + + def __enter__(self) -> Any: """Enter the context manager. Returns: grpc.Server: GRPC server """ - add_StorageServicer_to_server(StorageGRPCServicer(self.modyn_config), self.server) port = self.modyn_config["storage"]["port"] logger.info(f"Starting server. Listening on port {port}") - self.server.add_insecure_port("[::]:" + port) - self.server.start() - return self.server + with _reserve_port(port) as port: + bind_address = "[::]:" + port + for _ in range(64): + worker = mp.Process( + target=_run_server, + args=( + bind_address, + self.modyn_config, + ), + ) + worker.start() + self.workers.append(worker) + + return self + + def wait_for_termination(self) -> None: + for worker in self.workers: + worker.join() def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: Exception) -> None: """Exit the context manager. @@ -52,4 +108,5 @@ def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: Exception) -> Non exc_val (Exception): exception value exc_tb (Exception): exception traceback """ - self.server.stop(0) + self.wait_for_termination() + del self.workers diff --git a/modyn/storage/internal/grpc/storage_grpc_servicer.py b/modyn/storage/internal/grpc/storage_grpc_servicer.py index 219eb5c65..f3c8c8936 100644 --- a/modyn/storage/internal/grpc/storage_grpc_servicer.py +++ b/modyn/storage/internal/grpc/storage_grpc_servicer.py @@ -1,9 +1,12 @@ """Storage GRPC servicer.""" import logging +import os +import threading from typing import Iterable, Tuple import grpc +from modyn.common.benchmark.stopwatch import Stopwatch from modyn.storage.internal.database.models import Dataset, File, Sample from modyn.storage.internal.database.storage_database_connection import StorageDatabaseConnection from modyn.storage.internal.database.storage_database_utils import get_file_wrapper, get_filesystem_wrapper @@ -64,6 +67,9 @@ def Get(self, request: GetRequest, context: grpc.ServicerContext) -> Iterable[Ge Yields: Iterator[Iterable[GetResponse]]: Response containing the data for the given keys. """ + tid = threading.get_native_id() + pid = os.getpid() + logger.info(f"[{pid}][{tid}] Received request for {len(request.keys)} items.") with StorageDatabaseConnection(self.modyn_config) as database: session = database.session @@ -73,12 +79,16 @@ def Get(self, request: GetRequest, context: grpc.ServicerContext) -> Iterable[Ge yield GetResponse() return + stopw = Stopwatch() + stopw.start("GetSamples") samples: list[Sample] = ( session.query(Sample) .filter(and_(Sample.sample_id.in_(request.keys), Sample.dataset_id == dataset.dataset_id)) .order_by(Sample.file_id) .all() ) + samples_time = stopw.stop() + logger.info(f"[{pid}][{tid}] Getting samples took {samples_time / 1000}s.") if len(samples) == 0: logger.error("No samples found in the database.")