From 13ab786b473cf3b288120fdb159cf81286587a17 Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Wed, 4 Oct 2023 09:00:31 -0400 Subject: [PATCH] Lock OMAP file before changing it to avoid corruption in multi-gateway environemnt. Fixes #56 Signed-off-by: Gil Bregman --- .env | 2 + .github/workflows/build-container.yml | 2 +- ceph-nvmeof.conf | 4 + control/grpc.py | 676 +++++++++++++++----------- control/server.py | 2 +- control/state.py | 48 ++ mk/demo.mk | 2 +- 7 files changed, 440 insertions(+), 296 deletions(-) diff --git a/.env b/.env index d952581ed..757b48ac3 100644 --- a/.env +++ b/.env @@ -8,6 +8,8 @@ QUAY_CEPH="${CONTAINER_REGISTRY}/vstart-cluster" QUAY_NVMEOF="${CONTAINER_REGISTRY}/nvmeof" QUAY_NVMEOFCLI="${CONTAINER_REGISTRY}/nvmeof-cli" MAINTAINER="Ceph Developers " +COMPOSE_PROJECT_NAME="ceph-nvmeof" +NVMEOF_CONTAINER_NAME="${COMPOSE_PROJECT_NAME}-nvmeof-1" # Performance NVMEOF_NOFILE=20480 # Max number of open files (depends on number of hosts connected) diff --git a/.github/workflows/build-container.yml b/.github/workflows/build-container.yml index 51c9f457a..b40ed115f 100644 --- a/.github/workflows/build-container.yml +++ b/.github/workflows/build-container.yml @@ -197,7 +197,7 @@ jobs: - name: Test run: | - make demo OPTS=-T + make demo OPTS=-T NVMEOF_CONTAINER_NAME="ceph-nvmeof_nvmeof_1" - name: Get subsystems run: | diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index f05cd0658..f9c152ad9 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -18,6 +18,10 @@ state_update_interval_sec = 5 #min_controller_id = 1 #max_controller_id = 65519 enable_discovery_controller = false +#omap_file_lock_duration = 60 +#omap_file_lock_retries = 15 +#omap_file_lock_retry_sleep_interval = 5 +#omap_file_update_retries = 10 [discovery] addr = 0.0.0.0 diff --git a/control/grpc.py b/control/grpc.py index 871ab56fc..9a8d7faa3 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -13,6 +13,7 @@ import uuid import random import logging +import errno import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf @@ -35,17 +36,29 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config, gateway_state, spdk_rpc_client) -> None: + def __init__(self, config, gateway_state, omap_state, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) self.config = config self.gateway_state = gateway_state + self.omap_state = omap_state self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: self.gateway_name = socket.gethostname() + self.omap_file_lock_duration = self.config.getint_with_default("gateway", "omap_file_lock_duration", 60) + self.omap_file_update_retries = self.config.getint_with_default("gateway", "omap_file_update_retries", 10) + self.context = None self._init_cluster_context() + def __enter__(self): + if self.context: + self.lock_and_update_omap_file() + + def __exit__(self, typ, value, traceback): + if self.context: + self.omap_state.unlock_omap() + def _init_cluster_context(self) -> None: """Init cluster context management variables""" self.clusters = {} @@ -86,9 +99,43 @@ def _alloc_cluster(self) -> str: ) return name + def lock_and_update_omap_file(self): + if self.omap_file_lock_duration <= 0: + self.logger.warning(f"Will not lock OMAP file, lock duration is not positive") + return + + need_to_update = False + for i in range(1, self.omap_file_update_retries): + try: + if need_to_update: + self.logger.warning(f"An update is required before locking the OMAP file") + save_original_context = self.context + try: + self.gateway_state.update() + need_to_update = False + except Exception as ex: + self.logger.warning(f"Got an exception while updating: {ex}") + self.context = save_original_context + self.omap_state.lock_omap(self.omap_file_lock_duration) + break + except OSError as err: + if err.errno == errno.EAGAIN: + self.logger.warning(f"Error locking OMAP file. The file is not current, will read the file and try again") + need_to_update = True + else: + self.logger.error(f"Error locking OMAP file, got exception: {err}") + raise + except Exception as ex: + self.logger.error(f"Error locking OMAP file, exception: {ex}") + raise + + if need_to_update: + raise Exception(f"Unable to lock OMAP file after updating {self.omap_file_update_retries} times, exiting") + def create_bdev(self, request, context=None): """Creates a bdev from an RBD image.""" + self.context = context if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -96,34 +143,35 @@ def create_bdev(self, request, context=None): self.logger.info(f"Received request to create bdev {name} from" f" {request.rbd_pool_name}/{request.rbd_image_name}" f" with block size {request.block_size}") - try: - bdev_name = rpc_bdev.bdev_rbd_create( - self.spdk_rpc_client, - name=name, - cluster_name=self._get_cluster(), - pool_name=request.rbd_pool_name, - rbd_name=request.rbd_image_name, - block_size=request.block_size, - uuid=request.uuid, - ) - self.logger.info(f"create_bdev: {bdev_name}") - except Exception as ex: - self.logger.error(f"create_bdev failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.bdev() - - if context: - # Update gateway state + with self: try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_bdev(bdev_name, json_req) + bdev_name = rpc_bdev.bdev_rbd_create( + self.spdk_rpc_client, + name=name, + cluster_name=self._get_cluster(), + pool_name=request.rbd_pool_name, + rbd_name=request.rbd_image_name, + block_size=request.block_size, + uuid=request.uuid, + ) + self.logger.info(f"create_bdev: {bdev_name}") except Exception as ex: - self.logger.error( - f"Error persisting create_bdev {bdev_name}: {ex}") - raise + self.logger.error(f"create_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.bdev() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_bdev(bdev_name, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting create_bdev {bdev_name}: {ex}") + raise return pb2.bdev(bdev_name=bdev_name, status=True) @@ -132,57 +180,69 @@ def delete_bdev(self, request, context=None): self.logger.info(f"Received request to delete bdev {request.bdev_name}") use_excep = None - req_get_subsystems = pb2.get_subsystems_req() - ret = self.get_subsystems(req_get_subsystems, context) - subsystems = json.loads(ret.subsystems) - for subsystem in subsystems: - for namespace in subsystem['namespaces']: - if namespace['bdev_name'] == request.bdev_name: - # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. - # Otherwise fail with EBUSY - if request.force: - self.logger.info(f"Will remove namespace {namespace['nsid']} from {subsystem['nqn']} as it is using bdev {request.bdev_name}") - try: - req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=subsystem['nqn'], nsid=namespace['nsid']) - ret = self.remove_namespace(req_rm_ns, context) - self.logger.info( - f"Removed namespace {namespace['nsid']} from {subsystem['nqn']}: {ret.status}") - except Exception as ex: - self.logger.error(f"Error removing namespace {namespace['nsid']} from {subsystem['nqn']}, will delete bdev {request.bdev_name} anyway: {ex}") - pass - else: - self.logger.error(f"Namespace {namespace['nsid']} from {subsystem['nqn']} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") - req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} - ret = {"code": -16, "message": "Device or resource busy"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - use_excep = Exception(msg) - - try: - if use_excep: - raise use_excep - ret = rpc_bdev.bdev_rbd_delete( - self.spdk_rpc_client, - request.bdev_name, - ) - self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") - except Exception as ex: - self.logger.error(f"delete_bdev failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() if context: - # Update gateway state + req_get_subsystems = pb2.get_subsystems_req() + ret = self.get_subsystems(req_get_subsystems, context) + subsystems = json.loads(ret.subsystems) + for subsystem in subsystems: + if use_excep is not None: + break + for namespace in subsystem['namespaces']: + if use_excep is not None: + break + if namespace['bdev_name'] == request.bdev_name: + # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. + # Otherwise fail with EBUSY + if request.force: + self.logger.info(f"Will remove namespace {namespace['nsid']} from {subsystem['nqn']} as it is using bdev {request.bdev_name}") + try: + req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=subsystem['nqn'], nsid=namespace['nsid']) + ret = self.remove_namespace(req_rm_ns, context) + self.logger.info( + f"Removed namespace {namespace['nsid']} from {subsystem['nqn']}: {ret.status}") + except Exception as ex: + self.logger.error(f"Error removing namespace {namespace['nsid']} from {subsystem['nqn']}, will delete bdev {request.bdev_name} anyway: {ex}") + pass + else: + self.logger.error(f"Namespace {namespace['nsid']} from {subsystem['nqn']} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") + req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} + ret = {"code": -errno.EBUSY, "message": "Device or resource busy"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + use_excep = Exception(msg) + + # If we're about to just throw an exception there is no need to lock and unlock so simulate the case context is None + if use_excep: + self.context = None + else: + self.context = context + with self: try: - self.gateway_state.remove_bdev(request.bdev_name) + if use_excep: + raise use_excep + ret = rpc_bdev.bdev_rbd_delete( + self.spdk_rpc_client, + request.bdev_name, + ) + self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting delete_bdev {request.bdev_name}: {ex}") - raise + self.logger.error(f"delete_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_bdev(request.bdev_name) + except Exception as ex: + self.logger.error( + f"Error persisting delete_bdev {request.bdev_name}: {ex}") + raise return pb2.req_status(status=ret) @@ -197,34 +257,37 @@ def create_subsystem(self, request, context=None): random.seed() randser = random.randint(2, 99999999999999) request.serial_number = f"SPDK{randser}" - try: - ret = rpc_nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - serial_number=request.serial_number, - max_namespaces=request.max_namespaces, - min_cntlid=min_cntlid, - max_cntlid=max_cntlid, - ) - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"create_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - if context: - # Update gateway state + self.context = context + with self: try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_subsystem(request.subsystem_nqn, - json_req) + ret = rpc_nvmf.nvmf_create_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + serial_number=request.serial_number, + max_namespaces=request.max_namespaces, + min_cntlid=min_cntlid, + max_cntlid=max_cntlid, + ) + self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting create_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"create_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_subsystem(request.subsystem_nqn, + json_req) + except Exception as ex: + self.logger.error(f"Error persisting create_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) @@ -233,27 +296,30 @@ def delete_subsystem(self, request, context=None): self.logger.info( f"Received request to delete subsystem {request.subsystem_nqn}") - try: - ret = rpc_nvmf.nvmf_delete_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - ) - self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"delete_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - if context: - # Update gateway state + self.context = context + with self: try: - self.gateway_state.remove_subsystem(request.subsystem_nqn) + ret = rpc_nvmf.nvmf_delete_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + ) + self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting delete_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"delete_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_subsystem(request.subsystem_nqn) + except Exception as ex: + self.logger.error(f"Error persisting delete_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) @@ -262,34 +328,37 @@ def add_namespace(self, request, context=None): self.logger.info(f"Received request to add {request.bdev_name} to" f" {request.subsystem_nqn}") - try: - nsid = rpc_nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - bdev_name=request.bdev_name, - nsid=request.nsid, - ) - self.logger.info(f"add_namespace: {nsid}") - except Exception as ex: - self.logger.error(f"add_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.nsid() - if context: - # Update gateway state + self.context = context + with self: try: - if not request.nsid: - request.nsid = nsid - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_namespace(request.subsystem_nqn, - str(nsid), json_req) + nsid = rpc_nvmf.nvmf_subsystem_add_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name, + nsid=request.nsid, + ) + self.logger.info(f"add_namespace: {nsid}") except Exception as ex: - self.logger.error( - f"Error persisting add_namespace {nsid}: {ex}") - raise + self.logger.error(f"add_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.nsid() + + if context: + # Update gateway state + try: + if not request.nsid: + request.nsid = nsid + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_namespace(request.subsystem_nqn, + str(nsid), json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_namespace {nsid}: {ex}") + raise return pb2.nsid(nsid=nsid, status=True) @@ -298,115 +367,122 @@ def remove_namespace(self, request, context=None): self.logger.info(f"Received request to remove {request.nsid} from" f" {request.subsystem_nqn}") - try: - ret = rpc_nvmf.nvmf_subsystem_remove_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - nsid=request.nsid, - ) - self.logger.info(f"remove_namespace {request.nsid}: {ret}") - except Exception as ex: - self.logger.error(f"remove_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - if context: - # Update gateway state + self.context = context + with self: try: - self.gateway_state.remove_namespace(request.subsystem_nqn, - str(request.nsid)) + ret = rpc_nvmf.nvmf_subsystem_remove_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + ) + self.logger.info(f"remove_namespace {request.nsid}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting remove_namespace {request.nsid}: {ex}") - raise + self.logger.error(f"remove_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_namespace(request.subsystem_nqn, + str(request.nsid)) + except Exception as ex: + self.logger.error( + f"Error persisting remove_namespace {request.nsid}: {ex}") + raise return pb2.req_status(status=ret) def add_host(self, request, context=None): """Adds a host to a subsystem.""" - try: - if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info(f"Received request to allow any host to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=False, - ) - self.logger.info(f"add_host *: {ret}") - else: # Allow single host access to subsystem - self.logger.info( - f"Received request to add host {request.host_nqn} to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"add_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"add_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + self.context = context + with self: try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_host(request.subsystem_nqn, - request.host_nqn, json_req) + if request.host_nqn == "*": # Allow any host access to subsystem + self.logger.info(f"Received request to allow any host to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=False, + ) + self.logger.info(f"add_host *: {ret}") + else: # Allow single host access to subsystem + self.logger.info( + f"Received request to add host {request.host_nqn} to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_add_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"add_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting add_host {request.host_nqn}: {ex}") - raise + self.logger.error(f"add_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_host(request.subsystem_nqn, + request.host_nqn, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_host {request.host_nqn}: {ex}") + raise return pb2.req_status(status=ret) def remove_host(self, request, context=None): """Removes a host from a subsystem.""" - try: - if request.host_nqn == "*": # Disable allow any host access - self.logger.info( - f"Received request to disable any host access to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=True, - ) - self.logger.info(f"remove_host *: {ret}") - else: # Remove single host access to subsystem - self.logger.info( - f"Received request to remove host_{request.host_nqn} from" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_remove_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"remove_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"remove_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + self.context = context + with self: try: - self.gateway_state.remove_host(request.subsystem_nqn, - request.host_nqn) + if request.host_nqn == "*": # Disable allow any host access + self.logger.info( + f"Received request to disable any host access to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=True, + ) + self.logger.info(f"remove_host *: {ret}") + else: # Remove single host access to subsystem + self.logger.info( + f"Received request to remove host_{request.host_nqn} from" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_remove_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"remove_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting remove_host: {ex}") - raise + self.logger.error(f"remove_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_host(request.subsystem_nqn, + request.host_nqn) + except Exception as ex: + self.logger.error(f"Error persisting remove_host: {ex}") + raise return pb2.req_status(status=ret) @@ -417,40 +493,47 @@ def create_listener(self, request, context=None): self.logger.info(f"Received request to create {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" f" {request.traddr}:{request.trsvcid}.") - try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"create_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"create_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - if context: - # Update gateway state + # If the gateway name is wrong we will just throw an exception, so there is no point locking OMAP file + if request.gateway_name == self.gateway_name: + self.context = context + else: + self.context = None + with self: try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_listener(request.nqn, - request.gateway_name, - request.trtype, request.traddr, - request.trsvcid, json_req) + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_add_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"create_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting add_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"create_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_listener(request.nqn, + request.gateway_name, + request.trtype, request.traddr, + request.trsvcid, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) @@ -461,39 +544,46 @@ def delete_listener(self, request, context=None): self.logger.info(f"Received request to delete {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" f" {request.traddr}:{request.trsvcid}.") - try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_remove_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"delete_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"delete_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - if context: - # Update gateway state + # If the gateway name is wrong we will just throw an exception, so there is no point locking OMAP file + if request.gateway_name == self.gateway_name: + self.context = context + else: + self.context = None + with self: try: - self.gateway_state.remove_listener(request.nqn, - request.gateway_name, - request.trtype, - request.traddr, - request.trsvcid) + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_remove_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"delete_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting delete_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"delete_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_listener(request.nqn, + request.gateway_name, + request.trtype, + request.traddr, + request.trsvcid) + except Exception as ex: + self.logger.error( + f"Error persisting delete_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) diff --git a/control/server.py b/control/server.py index bc740fc7e..f25ef5f3e 100644 --- a/control/server.py +++ b/control/server.py @@ -100,7 +100,7 @@ def serve(self): local_state = LocalGatewayState() gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) - self.gateway_rpc = GatewayService(self.config, gateway_state, + self.gateway_rpc = GatewayService(self.config, gateway_state, omap_state, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) diff --git a/control/state.py b/control/state.py index b99664c38..9229345c1 100644 --- a/control/state.py +++ b/control/state.py @@ -11,6 +11,7 @@ import threading import rados import logging +import errno from typing import Dict from collections import defaultdict from abc import ABC, abstractmethod @@ -162,6 +163,8 @@ class OmapGatewayState(GatewayState): """ OMAP_VERSION_KEY = "omap_version" + OMAP_FILE_LOCK_NAME = "omap_file_lock" + OMAP_FILE_LOCK_COOKIE = "omap_file_cookie" def __init__(self, config): self.config = config @@ -173,6 +176,8 @@ def __init__(self, config): ceph_pool = self.config.get("ceph", "pool") ceph_conf = self.config.get("ceph", "config_file") rados_id = self.config.get_with_default("ceph", "id", "") + self.omap_file_lock_retries = self.config.getint_with_default("gateway", "omap_file_lock_retries", 15) + self.omap_file_lock_retry_sleep_interval = self.config.getint_with_default("gateway", "omap_file_lock_retry_sleep_interval", 5) try: conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id) @@ -222,6 +227,49 @@ def get_omap_version(self) -> int: f" invalid number of values ({value_list}).") raise + def lock_omap(self, duration): + got_lock = False + + for i in range(1, self.omap_file_lock_retries): + try: + with rados.WriteOpCtx() as write_op: + self.ioctx.lock_exclusive(self.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE, "OMAP file changes lock", duration, 0) + got_lock = True + break + except rados.ObjectExists as ex: + self.logger.info(f"We already locked the OMAP file") + got_lock = True + break + except rados.ObjectBusy as ex: + self.logger.warning(f"Someone else locked the OMAP file, will try again in {self.omap_file_lock_retry_sleep_interval} seconds") + time.sleep(self.omap_file_lock_retry_sleep_interval) + except Exception as ex: + self.logger.error(f"Unable to lock OMAP file: {ex}. Exiting!") + raise + + if not got_lock: + self.logger.error(f"Unable to lock OMAP file after {self.omap_file_lock_retries} retries. Exiting!") + raise Exception("Unable to lock OMAP file") + + omap_version = self.get_omap_version() + + if omap_version > self.version: + self.logger.warning(f"Local version {self.version} differs from OMAP file version {omap_version}, need to read the OMAP file") + self.unlock_omap() + raise OSError(errno.EAGAIN, "Unable to lock OMAP file, file not current", self.omap_name) + + return True + + def unlock_omap(self): + try: + with rados.WriteOpCtx() as write_op: + self.ioctx.unlock(self.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE) + except rados.ObjectNotFound as ex: + self.logger.warning(f"No such lock, the lock duration might have passed") + except Exception as ex: + self.logger.error(f"Unable to unlock OMAP file: {ex}.") + pass + def get_state(self) -> Dict[str, str]: """Returns dict of all OMAP keys and values.""" with rados.ReadOpCtx() as read_op: diff --git a/mk/demo.mk b/mk/demo.mk index 4f2f9c809..f8bcbd63d 100644 --- a/mk/demo.mk +++ b/mk/demo.mk @@ -7,7 +7,7 @@ rbd: CMD = bash -c "rbd -p $(RBD_POOL) info $(RBD_IMAGE_NAME) || rbd -p $(RBD_PO # demo # the fist gateway in docker enviroment, hostname defaults to container id -demo: export NVMEOF_HOSTNAME != docker ps -q -f name=ceph-nvmeof_nvmeof_1 +demo: export NVMEOF_HOSTNAME != docker ps -q -f name=$(NVMEOF_CONTAINER_NAME) demo: rbd ## Expose RBD_IMAGE_NAME as NVMe-oF target $(NVMEOF_CLI) create_bdev --pool $(RBD_POOL) --image $(RBD_IMAGE_NAME) --bdev $(BDEV_NAME) $(NVMEOF_CLI) create_subsystem --subnqn $(NQN)