diff --git a/control/server.py b/control/server.py index 59b8f55b..2e1b4a3d 100644 --- a/control/server.py +++ b/control/server.py @@ -139,7 +139,11 @@ def set_group_id(self, id: int): def _wait_for_group_id(self): """Waits for the monitor notification of this gatway's group id""" - self.monitor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + # Python 3.8: Default value of max_workers is min(32, os.cpu_count() + 4). + # This default value preserves at least 5 workers for I/O bound tasks. It utilizes at + # most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using + # very large resources implicitly on many-core machines. + self.monitor_server = grpc.server(futures.ThreadPoolExecutor()) monitor_pb2_grpc.add_MonitorGroupServicer_to_server(MonitorGroupService(self.set_group_id), self.monitor_server) self.monitor_server.add_insecure_port(self._monitor_address()) self.monitor_server.start() @@ -176,7 +180,7 @@ def serve(self): # Register service implementation with server gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) - omap_lock = OmapLock(omap_state, gateway_state) + omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock) self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) diff --git a/control/state.py b/control/state.py index a2233b43..a9cac6e8 100644 --- a/control/state.py +++ b/control/state.py @@ -185,15 +185,27 @@ def reset(self, omap_state): """Resets dictionary with OMAP state.""" self.state = omap_state +class ReleasedLock: + def __init__(self, lock: threading.Lock): + self.lock = lock + assert self.lock.locked(), "Lock must be locked when creating ReleasedLock instance" + + def __enter__(self): + self.lock.release() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.lock.acquire() class OmapLock: OMAP_FILE_LOCK_NAME = "omap_file_lock" OMAP_FILE_LOCK_COOKIE = "omap_file_cookie" - def __init__(self, omap_state, gateway_state) -> None: + def __init__(self, omap_state, gateway_state, rpc_lock: threading.Lock) -> None: self.logger = omap_state.logger self.omap_state = omap_state self.gateway_state = gateway_state + self.rpc_lock = rpc_lock self.is_locked = False self.omap_file_lock_duration = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_duration", 20) self.omap_file_update_reloads = self.omap_state.config.getint_with_default("gateway", "omap_file_update_reloads", 10) @@ -258,6 +270,7 @@ def execute_omap_locking_function(self, grpc_func, omap_locking_func, request, c def lock_omap(self): got_lock = False + assert self.rpc_lock.locked(), "The RPC lock is not locked." for i in range(0, self.omap_file_lock_retries + 1): try: @@ -274,7 +287,8 @@ def lock_omap(self): except rados.ObjectBusy as ex: self.logger.warning( f"The OMAP file is locked, will try again in {self.omap_file_lock_retry_sleep_interval} seconds") - time.sleep(self.omap_file_lock_retry_sleep_interval) + with ReleasedLock(self.rpc_lock): + time.sleep(self.omap_file_lock_retry_sleep_interval) except Exception: self.logger.exception(f"Unable to lock OMAP file, exiting") raise