diff --git a/.github/workflows/build-container.yml b/.github/workflows/build-container.yml index 296d2c823..4aaa19499 100644 --- a/.github/workflows/build-container.yml +++ b/.github/workflows/build-container.yml @@ -65,7 +65,7 @@ jobs: strategy: fail-fast: false matrix: - test: ["cli", "state", "multi_gateway", "server", "grpc"] + test: ["cli", "state", "multi_gateway", "server", "grpc", "omap_lock"] runs-on: ubuntu-latest env: HUGEPAGES: 512 # for multi gateway test, approx 256 per gateway instance diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index 0c54d561a..ff338b471 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -18,6 +18,10 @@ state_update_interval_sec = 5 #min_controller_id = 1 #max_controller_id = 65519 enable_spdk_discovery_controller = False +#omap_file_lock_duration = 60 +#omap_file_lock_retries = 15 +#omap_file_lock_retry_sleep_interval = 5 +#omap_file_update_retries = 10 [discovery] addr = 0.0.0.0 diff --git a/control/grpc.py b/control/grpc.py index 782089dc8..2b3653afc 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -16,6 +16,7 @@ import os import threading import errno +import contextlib import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf @@ -42,7 +43,7 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config, gateway_state, spdk_rpc_client) -> None: + def __init__(self, config, gateway_state, omap_lock, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) ver = os.getenv("NVMEOF_VERSION") @@ -73,6 +74,7 @@ def __init__(self, config, gateway_state, spdk_rpc_client) -> None: config.dump_config_file(self.logger) self.rpc_lock = threading.Lock() self.gateway_state = gateway_state + self.omap_lock = omap_lock self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: @@ -119,6 +121,12 @@ def _alloc_cluster(self) -> str: ) return name + def _get_rpc_lock(self, context): + # If we're inside an update triggered by an Omap lock, no need to lock RPC again + if context is None and self.omap_lock.is_locking_update_active() and self.rpc_lock.locked(): + return contextlib.suppress(None) + return self.rpc_lock + def create_bdev_safe(self, request, context=None): """Creates a bdev from an RBD image.""" @@ -129,39 +137,40 @@ def create_bdev_safe(self, request, context=None): self.logger.info(f"Received request to create bdev {name} from" f" {request.rbd_pool_name}/{request.rbd_image_name}" f" with block size {request.block_size}") - try: - bdev_name = rpc_bdev.bdev_rbd_create( - self.spdk_rpc_client, - name=name, - cluster_name=self._get_cluster(), - pool_name=request.rbd_pool_name, - rbd_name=request.rbd_image_name, - block_size=request.block_size, - uuid=request.uuid, - ) - self.logger.info(f"create_bdev: {bdev_name}") - except Exception as ex: - self.logger.error(f"create_bdev failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.bdev() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_bdev(bdev_name, json_req) + bdev_name = rpc_bdev.bdev_rbd_create( + self.spdk_rpc_client, + name=name, + cluster_name=self._get_cluster(), + pool_name=request.rbd_pool_name, + rbd_name=request.rbd_image_name, + block_size=request.block_size, + uuid=request.uuid, + ) + self.logger.info(f"create_bdev: {bdev_name}") except Exception as ex: - self.logger.error( - f"Error persisting create_bdev {bdev_name}: {ex}") - raise + self.logger.error(f"create_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.bdev() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_bdev(bdev_name, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting create_bdev {bdev_name}: {ex}") + raise return pb2.bdev(bdev_name=bdev_name, status=True) def create_bdev(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.create_bdev_safe(request, context) def get_bdev_namespaces(self, bdev_name) -> list: @@ -194,56 +203,57 @@ def delete_bdev_safe(self, request, context=None): self.logger.info(f"Received request to delete bdev {request.bdev_name}") ns_list = [] - if context: - ns_list = self.get_bdev_namespaces(request.bdev_name) - for namespace in ns_list: - # We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP. - # Otherwise fail with EBUSY - try: - ns_nsid = namespace["nsid"] - ns_nqn = namespace["nqn"] - except Exception as ex: - self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") - continue - - if request.force: - self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") + with self.omap_lock(context=context): + if context: + ns_list = self.get_bdev_namespaces(request.bdev_name) + for namespace in ns_list: + # We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP. + # Otherwise fail with EBUSY try: - self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid)) - self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}") + ns_nsid = namespace["nsid"] + ns_nqn = namespace["nqn"] except Exception as ex: - self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") - pass - else: - self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") - req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} - ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent = 2), - "Got JSON-RPC error response", "response:", json.dumps(ret, indent = 2)]) - return self.delete_bdev_handle_exception(context, Exception(msg)) - - try: - ret = rpc_bdev.bdev_rbd_delete( - self.spdk_rpc_client, - request.bdev_name, - ) - self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") - except Exception as ex: - return self.delete_bdev_handle_exception(context, ex) + self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") + continue + + if request.force: + self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") + try: + self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid)) + self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}") + except Exception as ex: + self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") + pass + else: + self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") + req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} + ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent = 2), + "Got JSON-RPC error response", "response:", json.dumps(ret, indent = 2)]) + return self.delete_bdev_handle_exception(context, Exception(msg)) - if context: - # Update gateway state try: - self.gateway_state.remove_bdev(request.bdev_name) + ret = rpc_bdev.bdev_rbd_delete( + self.spdk_rpc_client, + request.bdev_name, + ) + self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting delete_bdev {request.bdev_name}: {ex}") - raise + return self.delete_bdev_handle_exception(context, ex) + + if context: + # Update gateway state + try: + self.gateway_state.remove_bdev(request.bdev_name) + except Exception as ex: + self.logger.error( + f"Error persisting delete_bdev {request.bdev_name}: {ex}") + raise return pb2.req_status(status=ret) def delete_bdev(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.delete_bdev_safe(request, context) def is_discovery_nqn(self, nqn) -> bool: @@ -283,55 +293,56 @@ def create_subsystem_safe(self, request, context=None): request.serial_number = f"SPDK{randser}" self.logger.info(f"No serial number specified, will use {request.serial_number}") - try: - subsys_using_serial = self.serial_number_already_used(context, request.serial_number) - if subsys_using_serial: - self.logger.error(f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}") - req = {"subsystem_nqn": request.subsystem_nqn, - "serial_number": request.serial_number, - "max_namespaces": request.max_namespaces, - "ana_reporting": request.ana_reporting, - "enable_ha": request.enable_ha, - "method": "nvmf_create_subsystem", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - raise Exception(msg) - ret = rpc_nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - serial_number=request.serial_number, - max_namespaces=request.max_namespaces, - min_cntlid=min_cntlid, - max_cntlid=max_cntlid, - ana_reporting = request.ana_reporting, - ) - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"create_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_subsystem(request.subsystem_nqn, - json_req) + subsys_using_serial = self.serial_number_already_used(context, request.serial_number) + if subsys_using_serial: + self.logger.error(f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}") + req = {"subsystem_nqn": request.subsystem_nqn, + "serial_number": request.serial_number, + "max_namespaces": request.max_namespaces, + "ana_reporting": request.ana_reporting, + "enable_ha": request.enable_ha, + "method": "nvmf_create_subsystem", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + ret = rpc_nvmf.nvmf_create_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + serial_number=request.serial_number, + max_namespaces=request.max_namespaces, + min_cntlid=min_cntlid, + max_cntlid=max_cntlid, + ana_reporting = request.ana_reporting, + ) + self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting create_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"create_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_subsystem(request.subsystem_nqn, + json_req) + except Exception as ex: + self.logger.error(f"Error persisting create_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) def create_subsystem(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.create_subsystem_safe(request, context) def delete_subsystem_safe(self, request, context=None): @@ -343,32 +354,33 @@ def delete_subsystem_safe(self, request, context=None): if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't delete a discovery subsystem") - try: - ret = rpc_nvmf.nvmf_delete_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - ) - self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"delete_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_subsystem(request.subsystem_nqn) + ret = rpc_nvmf.nvmf_delete_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + ) + self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting delete_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"delete_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_subsystem(request.subsystem_nqn) + except Exception as ex: + self.logger.error(f"Error persisting delete_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) def delete_subsystem(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.delete_subsystem_safe(request, context) def add_namespace_safe(self, request, context=None): @@ -383,40 +395,41 @@ def add_namespace_safe(self, request, context=None): if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't add a namespace to a discovery subsystem") - try: - nsid = rpc_nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - bdev_name=request.bdev_name, - nsid=request.nsid, - anagrpid=request.anagrpid, - ) - self.logger.info(f"add_namespace: {nsid}") - except Exception as ex: - self.logger.error(f"add_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.nsid() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - if not request.nsid: - request.nsid = nsid - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_namespace(request.subsystem_nqn, - str(nsid), json_req) + nsid = rpc_nvmf.nvmf_subsystem_add_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name, + nsid=request.nsid, + anagrpid=request.anagrpid, + ) + self.logger.info(f"add_namespace: {nsid}") except Exception as ex: - self.logger.error( - f"Error persisting add_namespace {nsid}: {ex}") - raise + self.logger.error(f"add_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.nsid() + + if context: + # Update gateway state + try: + if not request.nsid: + request.nsid = nsid + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_namespace(request.subsystem_nqn, + str(nsid), json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_namespace {nsid}: {ex}") + raise return pb2.nsid(nsid=nsid, status=True) def add_namespace(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.add_namespace_safe(request, context) def remove_namespace_safe(self, request, context=None): @@ -428,34 +441,35 @@ def remove_namespace_safe(self, request, context=None): if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't remove a namespace from a discovery subsystem") - try: - ret = rpc_nvmf.nvmf_subsystem_remove_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - nsid=request.nsid, - ) - self.logger.info(f"remove_namespace {request.nsid}: {ret}") - except Exception as ex: - self.logger.error(f"remove_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_namespace(request.subsystem_nqn, - str(request.nsid)) + ret = rpc_nvmf.nvmf_subsystem_remove_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + ) + self.logger.info(f"remove_namespace {request.nsid}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting remove_namespace {request.nsid}: {ex}") - raise + self.logger.error(f"remove_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_namespace(request.subsystem_nqn, + str(request.nsid)) + except Exception as ex: + self.logger.error( + f"Error persisting remove_namespace {request.nsid}: {ex}") + raise return pb2.req_status(status=ret) def remove_namespace(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.remove_namespace_safe(request, context) def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool: @@ -477,66 +491,67 @@ def add_host_safe(self, request, context=None): if self.is_discovery_nqn(request.host_nqn): raise Exception(f"Can't use a discovery NQN as host NQN") - try: - host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn) - if host_already_exist: - if request.host_nqn == "*": - self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}") - req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, - "method": "nvmf_subsystem_allow_any_host", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"} - else: - self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}") - req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, - "method": "nvmf_subsystem_add_host", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - raise Exception(msg) - if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info(f"Received request to allow any host to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=False, - ) - self.logger.info(f"add_host *: {ret}") - else: # Allow single host access to subsystem - self.logger.info( - f"Received request to add host {request.host_nqn} to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"add_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"add_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_host(request.subsystem_nqn, - request.host_nqn, json_req) + host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn) + if host_already_exist: + if request.host_nqn == "*": + self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_allow_any_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"} + else: + self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_add_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + if request.host_nqn == "*": # Allow any host access to subsystem + self.logger.info(f"Received request to allow any host to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=False, + ) + self.logger.info(f"add_host *: {ret}") + else: # Allow single host access to subsystem + self.logger.info( + f"Received request to add host {request.host_nqn} to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_add_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"add_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting add_host {request.host_nqn}: {ex}") - raise + self.logger.error(f"add_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_host(request.subsystem_nqn, + request.host_nqn, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_host {request.host_nqn}: {ex}") + raise return pb2.req_status(status=ret) def add_host(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.add_host_safe(request, context) def remove_host_safe(self, request, context=None): @@ -548,47 +563,48 @@ def remove_host_safe(self, request, context=None): if self.is_discovery_nqn(request.host_nqn): raise Exception(f"Can't use a discovery NQN as host NQN") - try: - if request.host_nqn == "*": # Disable allow any host access - self.logger.info( - f"Received request to disable any host access to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=True, - ) - self.logger.info(f"remove_host *: {ret}") - else: # Remove single host access to subsystem - self.logger.info( - f"Received request to remove host_{request.host_nqn} from" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_remove_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"remove_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"remove_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_host(request.subsystem_nqn, - request.host_nqn) + if request.host_nqn == "*": # Disable allow any host access + self.logger.info( + f"Received request to disable any host access to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=True, + ) + self.logger.info(f"remove_host *: {ret}") + else: # Remove single host access to subsystem + self.logger.info( + f"Received request to remove host_{request.host_nqn} from" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_remove_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"remove_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting remove_host: {ex}") - raise + self.logger.error(f"remove_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_host(request.subsystem_nqn, + request.host_nqn) + except Exception as ex: + self.logger.error(f"Error persisting remove_host: {ex}") + raise return pb2.req_status(status=ret) def remove_host(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.remove_host_safe(request, context) def matching_listener_exists(self, context, nqn, gw_name, trtype, traddr, trsvcid) -> bool: @@ -612,93 +628,94 @@ def create_listener_safe(self, request, context=None): if self.is_discovery_nqn(request.nqn): raise Exception(f"Can't create a listener for a discovery subsystem") - try: - if request.gateway_name == self.gateway_name: - listener_already_exist = self.matching_listener_exists( - context, request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) - if listener_already_exist: - self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}") - req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr, - "gateway_name": request.gateway_name, - "trsvcid": request.trsvcid, "adrfam": request.adrfam, - "method": "nvmf_subsystem_add_listener", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - raise Exception(msg) - ret = rpc_nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"create_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"create_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - state = self.gateway_state.local.get_state() - enable_ha = False - subsys_str = state.get(GatewayState.build_subsystem_key(request.nqn)) - if subsys_str: - self.logger.debug(f"value of sub-system: {subsys_str}") + with self.omap_lock(context=context): try: - subsys_dict = json.loads(subsys_str) - try: - enable_ha = subsys_dict["enable_ha"] - except KeyError: - enable_ha = False - self.logger.info(f"enable_ha: {enable_ha}") - except Exception as ex: - self.logger.error(f"Got exception trying to parse subsystem {request.nqn}: {ex}") - pass - else: - self.logger.info(f"No subsystem for {request.nqn}") - - if enable_ha: - for x in range (MAX_ANA_GROUPS): - try: - ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( + if request.gateway_name == self.gateway_name: + listener_already_exist = self.matching_listener_exists( + context, request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) + if listener_already_exist: + self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}") + req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr, + "gateway_name": request.gateway_name, + "trsvcid": request.trsvcid, "adrfam": request.adrfam, + "method": "nvmf_subsystem_add_listener", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + ret = rpc_nvmf.nvmf_subsystem_add_listener( self.spdk_rpc_client, nqn=request.nqn, - ana_state="inaccessible", trtype=request.trtype, traddr=request.traddr, trsvcid=request.trsvcid, adrfam=request.adrfam, - anagrpid=(x+1) ) - except Exception as ex: - self.logger.error(f" set_listener_ana_state failed with: \n {ex}") - raise - - if context: - # Update gateway state - try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_listener(request.nqn, - request.gateway_name, - request.trtype, request.traddr, - request.trsvcid, json_req) + ) + self.logger.info(f"create_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting add_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"create_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + state = self.gateway_state.local.get_state() + enable_ha = False + subsys_str = state.get(GatewayState.build_subsystem_key(request.nqn)) + if subsys_str: + self.logger.debug(f"value of sub-system: {subsys_str}") + try: + subsys_dict = json.loads(subsys_str) + try: + enable_ha = subsys_dict["enable_ha"] + except KeyError: + enable_ha = False + self.logger.info(f"enable_ha: {enable_ha}") + except Exception as ex: + self.logger.error(f"Got exception trying to parse subsystem {request.nqn}: {ex}") + pass + else: + self.logger.info(f"No subsystem for {request.nqn}") + + if enable_ha: + for x in range (MAX_ANA_GROUPS): + try: + ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( + self.spdk_rpc_client, + nqn=request.nqn, + ana_state="inaccessible", + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + anagrpid=(x+1) ) + except Exception as ex: + self.logger.error(f" set_listener_ana_state failed with: \n {ex}") + raise + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_listener(request.nqn, + request.gateway_name, + request.trtype, request.traddr, + request.trsvcid, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) def create_listener(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.create_listener_safe(request, context) def delete_listener_safe(self, request, context=None): @@ -713,44 +730,45 @@ def delete_listener_safe(self, request, context=None): if self.is_discovery_nqn(request.nqn): raise Exception(f"Can't delete a listener from a discovery subsystem") - try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_remove_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"delete_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"delete_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_listener(request.nqn, - request.gateway_name, - request.trtype, - request.traddr, - request.trsvcid) + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_remove_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"delete_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting delete_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"delete_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_listener(request.nqn, + request.gateway_name, + request.trtype, + request.traddr, + request.trsvcid) + except Exception as ex: + self.logger.error( + f"Error persisting delete_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) def delete_listener(self, request, context=None): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.delete_listener_safe(request, context) def get_subsystems_safe(self, request, context): @@ -769,5 +787,5 @@ def get_subsystems_safe(self, request, context): return pb2.subsystems_info(subsystems=json.dumps(ret)) def get_subsystems(self, request, context): - with self.rpc_lock: + with self._get_rpc_lock(context): return self.get_subsystems_safe(request, context) diff --git a/control/server.py b/control/server.py index 3f891074c..7675c2e82 100644 --- a/control/server.py +++ b/control/server.py @@ -25,7 +25,7 @@ from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc -from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler +from .state import GatewayState, LocalGatewayState, OmapLock, OmapGatewayState, GatewayStateHandler from .grpc import GatewayService from .discovery import DiscoveryService from .config import GatewayConfig @@ -113,10 +113,9 @@ def serve(self): self._start_discovery_service() # Register service implementation with server - gateway_state = GatewayStateHandler(self.config, local_state, - omap_state, self.gateway_rpc_caller) - self.gateway_rpc = GatewayService(self.config, gateway_state, - self.spdk_rpc_client) + gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) + omap_lock = OmapLock(omap_state, gateway_state) + self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) diff --git a/control/state.py b/control/state.py index f4e575b44..3295e7998 100644 --- a/control/state.py +++ b/control/state.py @@ -11,6 +11,7 @@ import threading import rados import logging +import errno from typing import Dict from collections import defaultdict from abc import ABC, abstractmethod @@ -165,6 +166,123 @@ def reset(self, omap_state): self.state = omap_state +class OmapLock: + OMAP_FILE_LOCK_NAME = "omap_file_lock" + OMAP_FILE_LOCK_COOKIE = "omap_file_cookie" + + def __init__(self, omap_state, gateway_state) -> None: + self.logger = omap_state.logger + self.omap_state = omap_state + self.gateway_state = gateway_state + self.omap_file_lock_duration = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_duration", 60) + self.omap_file_update_retries = self.omap_state.config.getint_with_default("gateway", "omap_file_update_retries", 10) + self.omap_file_lock_retries = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_retries", 15) + self.omap_file_lock_retry_sleep_interval = self.omap_state.config.getint_with_default("gateway", + "omap_file_lock_retry_sleep_interval", 5) + self.enter_args = {} + self.locking_update_is_active = False + + def __call__(self, **kwargs): + self.enter_args.clear() + self.enter_args.update(kwargs) + return self + + # + # We pass the context from the different functions here. It should point to a real object in case we come from a real + # resource changing function, resulting from a CLI command. It will be None in case we come from an automatic update + # which is done because the local state is out of date. In case context is None, that is we're in the middle of an update + # we should not try to lock the OMAP file as the code will not try to make changes there, only the local spdk calls + # are done in such a case. + # + def __enter__(self): + context = self.enter_args.get("context") + if context and self.omap_file_lock_duration > 0: + self.lock_and_update_omap_file() + return self + + def __exit__(self, typ, value, traceback): + context = self.enter_args.get("context") + self.enter_args.clear() + if context and self.omap_file_lock_duration > 0: + self.unlock_omap() + + def lock_and_update_omap_file(self): + need_to_update = False + for i in range(1, self.omap_file_update_retries): + try: + if need_to_update: + self.logger.warning(f"An update is required before locking the OMAP file") + try: + self.locking_update_is_active = True + self.gateway_state.update() + need_to_update = False + self.locking_update_is_active = False + except Exception as ex: + self.logger.warning(f"Got an exception while updating: {ex}") + self.lock_omap() + break + except OSError as err: + if err.errno == errno.EAGAIN: + self.logger.warning(f"Error locking OMAP file. The file is not current, will read the file and try again") + need_to_update = True + else: + self.logger.error(f"Error locking OMAP file, got exception: {err}") + raise + except Exception as ex: + self.logger.error(f"Error locking OMAP file, exception: {ex}") + raise + + if need_to_update: + raise Exception(f"Unable to lock OMAP file after updating {self.omap_file_update_retries} times, exiting") + + def lock_omap(self): + got_lock = False + + for i in range(1, self.omap_file_lock_retries): + try: + self.omap_state.ioctx.lock_exclusive(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, + self.OMAP_FILE_LOCK_COOKIE, "OMAP file changes lock", self.omap_file_lock_duration, 0) + got_lock = True + break + except rados.ObjectExists as ex: + self.logger.info(f"We already locked the OMAP file") + got_lock = True + break + except rados.ObjectBusy as ex: + self.logger.warning( + f"Someone else locked the OMAP file, will try again in {self.omap_file_lock_retry_sleep_interval} seconds") + time.sleep(self.omap_file_lock_retry_sleep_interval) + except Exception as ex: + self.logger.error(f"Unable to lock OMAP file, exiting: {ex}") + raise + + if not got_lock: + self.logger.error(f"Unable to lock OMAP file after {self.omap_file_lock_retries} retries. Exiting!") + raise Exception("Unable to lock OMAP file") + + omap_version = self.omap_state.get_omap_version() + local_version = self.omap_state.get_local_version() + + if omap_version > local_version: + self.logger.warning( + f"Local version {local_version} differs from OMAP file version {omap_version}, need to read the OMAP file") + self.unlock_omap() + raise OSError(errno.EAGAIN, "Unable to lock OMAP file, file not current", self.omap_state.omap_name) + + return True + + def unlock_omap(self): + try: + self.omap_state.ioctx.unlock(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE) + except rados.ObjectNotFound as ex: + self.logger.warning(f"No such lock, the lock duration might have passed") + except Exception as ex: + self.logger.error(f"Unable to unlock OMAP file: {ex}") + pass + + def is_locking_update_active(self) -> bool: + return self.locking_update_is_active + class OmapGatewayState(GatewayState): """Persists gateway NVMeoF target state to an OMAP object. diff --git a/tests/test_omap_lock.py b/tests/test_omap_lock.py new file mode 100644 index 000000000..901c6e611 --- /dev/null +++ b/tests/test_omap_lock.py @@ -0,0 +1,125 @@ +import pytest +import copy +import grpc +import json +from control.server import GatewayServer +from control.proto import gateway_pb2 as pb2 +from control.proto import gateway_pb2_grpc as pb2_grpc +import spdk.rpc.bdev as rpc_bdev + +update_notify = False +update_interval_sec = 300 + +@pytest.fixture(scope="module") +def conn(config): + """Sets up and tears down Gateways A and B.""" + # Setup GatewayA and GatewayB configs + configA = copy.deepcopy(config) + configA.config["gateway"]["name"] = "GatewayA" + configA.config["gateway"]["group"] = "Group1" + configA.config["gateway"]["state_update_notify"] = str(update_notify) + configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec) + configA.config["gateway"]["min_controller_id"] = "1" + configA.config["gateway"]["max_controller_id"] = "20000" + configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" + configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" + configB = copy.deepcopy(configA) + addr = configA.get("gateway", "addr") + portA = configA.getint("gateway", "port") + portB = portA + 1 + configB.config["gateway"]["name"] = "GatewayB" + configB.config["gateway"]["port"] = str(portB) + configB.config["gateway"]["min_controller_id"] = "20001" + configB.config["gateway"]["max_controller_id"] = "40000" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" + configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" + + # Start servers + with ( + GatewayServer(configA) as gatewayA, + GatewayServer(configB) as gatewayB, + ): + gatewayA.serve() + # Delete existing OMAP state + gatewayA.gateway_rpc.gateway_state.delete_state() + # Create new + gatewayB.serve() + + # Bind the client and Gateways A & B + channelA = grpc.insecure_channel(f"{addr}:{portA}") + stubA = pb2_grpc.GatewayStub(channelA) + channelB = grpc.insecure_channel(f"{addr}:{portB}") + stubB = pb2_grpc.GatewayStub(channelB) + yield stubA, stubB, gatewayA.gateway_rpc, gatewayB.gateway_rpc + + # Stop gateways + gatewayA.server.stop(grace=1) + gatewayB.server.stop(grace=1) + gatewayB.gateway_rpc.gateway_state.delete_state() + +def test_multi_gateway_omap_reread(config, image, conn, caplog): + """Tests reading out of date OMAP file + """ + stubA, stubB, gatewayA, gatewayB = conn + bdev = "Ceph0" + bdev2 = "Ceph1" + nqn = "nqn.2016-06.io.spdk:cnode1" + serial = "SPDK00000000000001" + nsid = 10 + num_subsystems = 2 + + pool = config.get("ceph", "pool") + + # Send requests to create a subsystem with one namespace to GatewayA + bdev_req = pb2.create_bdev_req(bdev_name=bdev, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn, + serial_number=serial) + namespace_req = pb2.add_namespace_req(subsystem_nqn=nqn, + bdev_name=bdev, + nsid=nsid) + get_subsystems_req = pb2.get_subsystems_req() + ret_bdev = stubA.create_bdev(bdev_req) + ret_subsystem = stubA.create_subsystem(subsystem_req) + ret_namespace = stubA.add_namespace(namespace_req) + assert ret_bdev.status is True + assert ret_subsystem.status is True + assert ret_namespace.status is True + + # Until we create some resource on GW-B it shouldn't still have the resrouces created on GW-A, only the discovery subsystem + watchB = stubB.get_subsystems(get_subsystems_req) + listB = json.loads(watchB.subsystems) + assert len(listB) == 1 + + watchA = stubA.get_subsystems(get_subsystems_req) + listA = json.loads(watchA.subsystems) + assert len(listA) == num_subsystems + + bdev2_req = pb2.create_bdev_req(bdev_name=bdev2, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev2 = stubB.create_bdev(bdev2_req) + assert ret_bdev2.status is True + assert "need to read the OMAP file" in caplog.text + + # Make sure that after reading the OMAP file GW-B has the subsystem and namespace created on GW-A + watchB = stubB.get_subsystems(get_subsystems_req) + listB = json.loads(watchB.subsystems) + assert len(listB) == num_subsystems + assert listB[num_subsystems-1]["nqn"] == nqn + assert listB[num_subsystems-1]["serial_number"] == serial + assert listB[num_subsystems-1]["namespaces"][0]["nsid"] == nsid + assert listB[num_subsystems-1]["namespaces"][0]["bdev_name"] == bdev + + bdevsA = rpc_bdev.bdev_get_bdevs(gatewayA.spdk_rpc_client) + bdevsB = rpc_bdev.bdev_get_bdevs(gatewayB.spdk_rpc_client) + # GW-B should have the bdev created on GW-A after reading the OMAP file plus the one we created there + # GW-a should only have the bdev created on it as we didn't update it after creating the bdev on GW-B + assert len(bdevsA) == 1 + assert len(bdevsB) == 2 + assert bdevsA[0]["name"] == bdev + assert bdevsB[0]["name"] == bdev + assert bdevsB[1]["name"] == bdev2