Skip to content

Commit

Permalink
Make sure to unlock OMAP file when we're done writing to it.
Browse files Browse the repository at this point in the history
Fixes #689

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman authored and Alexander Indenbaum committed May 30, 2024
1 parent 9cd2088 commit 922697b
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 49 deletions.
96 changes: 58 additions & 38 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _put_cluster(self, name: str) -> None:
self.logger.info(f"put_cluster {name=} number bdevs: {self.clusters[anagrp][name]}")
return

assert False # we should find the cluster in our state
assert False, f"Cluster {name} is not found" # we should find the cluster in our state

def _alloc_cluster_name(self, anagrp: int) -> str:
"""Allocates a new cluster name for ana group"""
Expand All @@ -253,7 +253,10 @@ def _alloc_cluster(self, anagrp: int) -> str:

def _grpc_function_with_lock(self, func, request, context):
with self.rpc_lock:
return func(request, context)
rc = func(request, context)
if not self.omap_lock.omap_file_disable_unlock:
assert not self.omap_lock.locked(), f"OMAP is still locked when we're out of function {func}"
return rc

def execute_grpc_function(self, func, request, context):
"""This functions handles RPC lock by wrapping 'func' with
Expand All @@ -263,7 +266,7 @@ def execute_grpc_function(self, func, request, context):
"""
return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context)

def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, peer_msg = ""):
def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, context, peer_msg = ""):
"""Creates a bdev from an RBD image."""

if create_image:
Expand All @@ -273,7 +276,7 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl

self.logger.info(f"Received request to create bdev {name} from"
f" {rbd_pool_name}/{rbd_image_name} (size {rbd_image_size} bytes)"
f" with block size {block_size}, {cr_img_msg}{peer_msg}")
f" with block size {block_size}, {cr_img_msg}, context={context}{peer_msg}")

if block_size == 0:
return BdevStatus(status=errno.EINVAL,
Expand Down Expand Up @@ -411,7 +414,7 @@ def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""):

if not self.rpc_lock.locked():
self.logger.error(f"A call to delete_bdev() without holding the RPC lock")
assert self.rpc_lock.locked()
assert self.rpc_lock.locked(), "RPC is unlocked when calling delete_bdev()"

self.logger.info(f"Received request to delete bdev {bdev_name}{peer_msg}")
try:
Expand Down Expand Up @@ -541,7 +544,8 @@ def create_subsystem_safe(self, request, context):
self.logger.info(f"No serial number specified for {request.subsystem_nqn}, will use {request.serial_number}")

ret = False
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
errmsg = ""
try:
subsys_using_serial = None
Expand Down Expand Up @@ -654,7 +658,8 @@ def delete_subsystem_safe(self, request, context):
delete_subsystem_error_prefix = f"Failure deleting subsystem {request.subsystem_nqn}"

ret = False
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
ret = rpc_nvmf.nvmf_delete_subsystem(
self.spdk_rpc_client,
Expand Down Expand Up @@ -745,7 +750,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte
"""Adds a namespace to a subsystem."""

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling create_namespace()"
nsid_msg = ""
if nsid and uuid:
nsid_msg = f" using NSID {nsid} and UUID {uuid}"
Expand All @@ -757,7 +762,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte
add_namespace_error_prefix = f"Failure adding namespace{nsid_msg}to {subsystem_nqn}"

peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}{peer_msg}")
self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}, context: {context}{peer_msg}")

if anagrpid > self.subsys_max_ns[subsystem_nqn]:
errmsg = f"{add_namespace_error_prefix}: Group ID {anagrpid} is bigger than configured maximum {self.subsys_max_ns[subsystem_nqn]}"
Expand Down Expand Up @@ -909,14 +914,24 @@ def choose_anagrpid_for_namespace(self, nsid) ->int:
def namespace_add_safe(self, request, context):
"""Adds a namespace to a subsystem."""

grps_list = []
anagrp = 0
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())

with self.omap_lock(context=context):
if context:
if request.anagrpid != 0:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
else:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
assert anagrp != 0, "Chosen ANA group is 0"

omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if context:
errmsg, ns_nqn = self.check_if_image_used(request.rbd_pool_name, request.rbd_image_name)
if errmsg and ns_nqn:
Expand All @@ -935,24 +950,17 @@ def namespace_add_safe(self, request, context):
else: # new namespace
# If an explicit load balancing group was passed, make sure it exists
if request.anagrpid != 0:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
else:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
assert anagrp != 0
# if anagrp == 0:
# errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}"
# self.logger.error(errmsg)
# return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
request.anagrpid = anagrp

anagrp = request.anagrpid
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, peer_msg)
request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg)
if ret_bdev.status != 0:
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
self.logger.error(errmsg)
Expand Down Expand Up @@ -1016,12 +1024,14 @@ def namespace_add(self, request, context=None):
def namespace_change_load_balancing_group_safe(self, request, context):
"""Changes a namespace load balancing group."""

grps_list = []
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

with self.omap_lock(context=context):
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
Expand Down Expand Up @@ -1140,7 +1150,7 @@ def remove_namespace_from_state(self, nqn, nsid, context):
return pb2.req_status(status=0, error_message=os.strerror(0))

# If we got here context is not None, so we must hold the OMAP lock
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_namespace_from_state()"

# Update gateway state
try:
Expand All @@ -1161,7 +1171,7 @@ def remove_namespace(self, subsystem_nqn, nsid, context):
"""Removes a namespace from a subsystem."""

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_namespace()"
peer_msg = self.get_peer_message(context)
namespace_failure_prefix = f"Failure removing namespace {nsid} from {subsystem_nqn}"
self.logger.info(f"Received request to remove namespace {nsid} from {subsystem_nqn}{peer_msg}")
Expand Down Expand Up @@ -1470,7 +1480,8 @@ def namespace_set_qos_limits_safe(self, request, context):
limits_to_set = self.get_qos_limits_string(request)
self.logger.debug(f"After merging current QOS limits with previous ones for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
ret = rpc_bdev.bdev_set_qos_limit(
self.spdk_rpc_client,
Expand Down Expand Up @@ -1521,7 +1532,7 @@ def find_namespace_and_bdev_name(self, nqn, nsid, uuid, needs_lock, err_prefix):
else:
if not self.rpc_lock.locked():
self.logger.error(f"A call to find_namespace_and_bdev_name() with 'needs_lock' set to False and without holding the RPC lock")
assert self.rpc_lock.locked()
assert self.rpc_lock.locked(), "RPC is unlocked when calling find_namespace_and_bdev_name()"
lock_to_use = contextlib.suppress()

with lock_to_use:
Expand Down Expand Up @@ -1612,7 +1623,7 @@ def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int:
self.logger.info(f"nsid {ns_key} for nqn {subsys} to recycle:")
nsid = ns_key
bdev_name = self.subsystem_nsid_bdev[subsys][nsid]
assert bdev_name
assert bdev_name, f"Can't find bdev for subsystem {subsys}, namespace {nsid}"
ns_params = {'nsid':nsid, 'bdev_name':bdev_name, 'subsys':subsys}
list_ns_params.append(ns_params)
self.logger.info(f"nsid :{nsid}, pool_name: {self.bdev_params[bdev_name]['pool_name']}, rbd_name: {self.bdev_params[bdev_name]['image_name']}, block_size: {self.bdev_params[bdev_name]['block_size']}, uuid:{self.bdev_params[bdev_name]['uuid']}, anagrpid:{ana_id}")
Expand All @@ -1632,7 +1643,7 @@ def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int:
self.logger.info(f"ns params: {ns_params} ")
ret_bdev = self.create_bdev( ana_id, bdev_name, self.bdev_params[bdev_name]['uuid'], self.bdev_params[bdev_name]['pool_name'],
self.bdev_params[bdev_name]['image_name'], self.bdev_params[bdev_name]['block_size'], False,
self.bdev_params[bdev_name]['image_size'], peer_msg)
self.bdev_params[bdev_name]['image_size'], None, peer_msg)
self.logger.info(f"bdev_rbd_create: {bdev_name}")
if ret_bdev.status != 0:
errmsg = f"Failure adding bdev {bdev_name} "
Expand Down Expand Up @@ -1660,7 +1671,8 @@ def namespace_delete_safe(self, request, context):
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}{peer_msg}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False,
"Failure deleting namespace")
if not find_ret[0]:
Expand Down Expand Up @@ -1738,7 +1750,8 @@ def add_host_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn)
if host_already_exist:
Expand Down Expand Up @@ -1817,7 +1830,7 @@ def remove_host_from_state(self, subsystem_nqn, host_nqn, context):
return pb2.req_status(status=0, error_message=os.strerror(0))

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_host_from_state()"
# Update gateway state
try:
self.gateway_state.remove_host(subsystem_nqn, host_nqn)
Expand Down Expand Up @@ -1851,7 +1864,8 @@ def remove_host_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
if request.host_nqn == "*": # Disable allow any host access
self.logger.info(
Expand Down Expand Up @@ -2110,7 +2124,8 @@ def create_listener_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
if request.host_name == self.host_name:
if (adrfam, request.traddr, request.trsvcid) in self.subsystem_listeners[request.nqn]:
Expand Down Expand Up @@ -2222,7 +2237,7 @@ def remove_listener_from_state(self, nqn, host_name, traddr, port, context):
return pb2.req_status(status=0, error_message=os.strerror(0))

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_listener_from_state()"

host_name = host_name.strip()
listener_hosts = []
Expand Down Expand Up @@ -2314,7 +2329,8 @@ def delete_listener_safe(self, request, context):
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOTEMPTY, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
if request.host_name == self.host_name or request.force:
ret = rpc_nvmf.nvmf_subsystem_remove_listener(
Expand Down Expand Up @@ -2366,7 +2382,8 @@ def list_listeners_safe(self, request, context):
self.logger.info(f"Received request to list listeners for {request.subsystem}, context: {context}{peer_msg}")

listeners = []
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
state = self.gateway_state.local.get_state()
listener_prefix = GatewayState.build_partial_listener_key(request.subsystem)
for key, val in state.items():
Expand Down Expand Up @@ -2492,7 +2509,8 @@ def get_spdk_nvmf_log_flags_and_level_safe(self, request, context):
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to get SPDK nvmf log flags and level{peer_msg}")
log_flags = []
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
nvmf_log_flags = {key: value for key, value in rpc_log.log_get_flags(
self.spdk_rpc_client).items() if key.startswith('nvmf')}
Expand Down Expand Up @@ -2549,7 +2567,8 @@ def set_spdk_nvmf_logs_safe(self, request, context):

self.logger.info(f"Received request to set SPDK nvmf logs: log_level: {log_level}, print_level: {print_level}{peer_msg}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
nvmf_log_flags = [key for key in rpc_log.log_get_flags(self.spdk_rpc_client).keys() \
if key.startswith('nvmf')]
Expand Down Expand Up @@ -2597,7 +2616,8 @@ def disable_spdk_nvmf_logs_safe(self, request, context):
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to disable SPDK nvmf logs{peer_msg}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
nvmf_log_flags = [key for key in rpc_log.log_get_flags(self.spdk_rpc_client).keys() \
if key.startswith('nvmf')]
Expand Down
Loading

0 comments on commit 922697b

Please sign in to comment.