Skip to content

Commit

Permalink
Restore "OmapLock: release RPC lock while sleeping""
Browse files Browse the repository at this point in the history
This reverts commit f70cb50.

Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed May 27, 2024
1 parent f70cb50 commit ccf41fd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
8 changes: 6 additions & 2 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ def set_group_id(self, id: int):

def _wait_for_group_id(self):
"""Waits for the monitor notification of this gatway's group id"""
self.monitor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
# Python 3.8: Default value of max_workers is min(32, os.cpu_count() + 4).
# This default value preserves at least 5 workers for I/O bound tasks. It utilizes at
# most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using
# very large resources implicitly on many-core machines.
self.monitor_server = grpc.server(futures.ThreadPoolExecutor())
monitor_pb2_grpc.add_MonitorGroupServicer_to_server(MonitorGroupService(self.set_group_id), self.monitor_server)
self.monitor_server.add_insecure_port(self._monitor_address())
self.monitor_server.start()
Expand Down Expand Up @@ -176,7 +180,7 @@ def serve(self):

# Register service implementation with server
gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller)
omap_lock = OmapLock(omap_state, gateway_state)
omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)
Expand Down
18 changes: 16 additions & 2 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,27 @@ def reset(self, omap_state):
"""Resets dictionary with OMAP state."""
self.state = omap_state

class ReleasedLock:
def __init__(self, lock: threading.Lock):
self.lock = lock
assert self.lock.locked(), "Lock must be locked when creating ReleasedLock instance"

def __enter__(self):
self.lock.release()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.lock.acquire()

class OmapLock:
OMAP_FILE_LOCK_NAME = "omap_file_lock"
OMAP_FILE_LOCK_COOKIE = "omap_file_cookie"

def __init__(self, omap_state, gateway_state) -> None:
def __init__(self, omap_state, gateway_state, rpc_lock: threading.Lock) -> None:
self.logger = omap_state.logger
self.omap_state = omap_state
self.gateway_state = gateway_state
self.rpc_lock = rpc_lock
self.is_locked = False
self.omap_file_lock_duration = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_duration", 20)
self.omap_file_update_reloads = self.omap_state.config.getint_with_default("gateway", "omap_file_update_reloads", 10)
Expand Down Expand Up @@ -258,6 +270,7 @@ def execute_omap_locking_function(self, grpc_func, omap_locking_func, request, c

def lock_omap(self):
got_lock = False
assert self.rpc_lock.locked(), "The RPC lock is not locked."

for i in range(0, self.omap_file_lock_retries + 1):
try:
Expand All @@ -274,7 +287,8 @@ def lock_omap(self):
except rados.ObjectBusy as ex:
self.logger.warning(
f"The OMAP file is locked, will try again in {self.omap_file_lock_retry_sleep_interval} seconds")
time.sleep(self.omap_file_lock_retry_sleep_interval)
with ReleasedLock(self.rpc_lock):
time.sleep(self.omap_file_lock_retry_sleep_interval)
except Exception:
self.logger.exception(f"Unable to lock OMAP file, exiting")
raise
Expand Down

0 comments on commit ccf41fd

Please sign in to comment.