From bf23372767cf8611233b8771b87424f60e1706ce Mon Sep 17 00:00:00 2001 From: Alexander Indenbaum Date: Fri, 8 Sep 2023 01:52:16 +0300 Subject: [PATCH] Extract OmapObject, refactor state, remove localstate Signed-off-by: Alexander Indenbaum --- control/grpc.py | 26 ++- control/omap.py | 134 ++++++++++++ control/server.py | 16 +- control/state.py | 395 ++++++++---------------------------- tests/test_cli.py | 2 +- tests/test_multi_gateway.py | 4 +- tests/test_state.py | 72 ++++--- 7 files changed, 276 insertions(+), 373 deletions(-) create mode 100644 control/omap.py diff --git a/control/grpc.py b/control/grpc.py index 465395b7..becd0c83 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -16,10 +16,13 @@ import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf +import spdk.rpc.client as rpc_client from google.protobuf import json_format from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc +from .state import GatewayStateHandler +from .config import GatewayConfig class GatewayService(pb2_grpc.GatewayServicer): @@ -35,7 +38,8 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config, gateway_state, spdk_rpc_client): + def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, + spdk_rpc_client: rpc_client.JSONRPCClient): self.logger = logging.getLogger(__name__) self.config = config @@ -82,7 +86,7 @@ def create_bdev(self, request, context=None): try: json_req = json_format.MessageToJson( request, preserving_proto_field_name=True) - self.gateway_state.add_bdev(bdev_name, json_req) + self.gateway_state.omap.add_bdev(bdev_name, json_req) except Exception as ex: self.logger.error( f"Error persisting create_bdev {bdev_name}: {ex}") @@ -141,7 +145,7 @@ def delete_bdev(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.remove_bdev(request.bdev_name) + self.gateway_state.omap.remove_bdev(request.bdev_name) except Exception as ex: self.logger.error( f"Error persisting delete_bdev {request.bdev_name}: {ex}") @@ -182,7 +186,7 @@ def create_subsystem(self, request, context=None): try: json_req = json_format.MessageToJson( request, preserving_proto_field_name=True) - self.gateway_state.add_subsystem(request.subsystem_nqn, + self.gateway_state.omap.add_subsystem(request.subsystem_nqn, json_req) except Exception as ex: self.logger.error(f"Error persisting create_subsystem" @@ -212,7 +216,7 @@ def delete_subsystem(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.remove_subsystem(request.subsystem_nqn) + self.gateway_state.omap.remove_subsystem(request.subsystem_nqn) except Exception as ex: self.logger.error(f"Error persisting delete_subsystem" f" {request.subsystem_nqn}: {ex}") @@ -247,7 +251,7 @@ def add_namespace(self, request, context=None): request.nsid = nsid json_req = json_format.MessageToJson( request, preserving_proto_field_name=True) - self.gateway_state.add_namespace(request.subsystem_nqn, + self.gateway_state.omap.add_namespace(request.subsystem_nqn, str(nsid), json_req) except Exception as ex: self.logger.error( @@ -278,7 +282,7 @@ def remove_namespace(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.remove_namespace(request.subsystem_nqn, + self.gateway_state.omap.remove_namespace(request.subsystem_nqn, str(request.nsid)) except Exception as ex: self.logger.error( @@ -322,7 +326,7 @@ def add_host(self, request, context=None): try: json_req = json_format.MessageToJson( request, preserving_proto_field_name=True) - self.gateway_state.add_host(request.subsystem_nqn, + self.gateway_state.omap.add_host(request.subsystem_nqn, request.host_nqn, json_req) except Exception as ex: self.logger.error( @@ -365,7 +369,7 @@ def remove_host(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.remove_host(request.subsystem_nqn, + self.gateway_state.omap.remove_host(request.subsystem_nqn, request.host_nqn) except Exception as ex: self.logger.error(f"Error persisting remove_host: {ex}") @@ -406,7 +410,7 @@ def create_listener(self, request, context=None): try: json_req = json_format.MessageToJson( request, preserving_proto_field_name=True) - self.gateway_state.add_listener(request.nqn, + self.gateway_state.omap.add_listener(request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid, json_req) @@ -448,7 +452,7 @@ def delete_listener(self, request, context=None): if context: # Update gateway state try: - self.gateway_state.remove_listener(request.nqn, + self.gateway_state.omap.remove_listener(request.nqn, request.gateway_name, request.trtype, request.traddr, diff --git a/control/omap.py b/control/omap.py new file mode 100644 index 00000000..d950ac5e --- /dev/null +++ b/control/omap.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2021 International Business Machines +# All rights reserved. +# +# SPDX-License-Identifier: LGPL-3.0-or-later +# +# Authors: anita.shekar@ibm.com, sandy.kaur@ibm.com +# +import logging +import rados +from typing import Dict +from collections import defaultdict + + +class OmapObject: + """Class representing versioned omap object""" + OMAP_VERSION_KEY = "omap_version" + + def __init__(self, name, ioctx) -> None: + self.version = 1 + self.watch = None + self.cached_object = defaultdict(dict) + self.name = name + self.logger = logging.getLogger(__name__) + self.ioctx = ioctx + self.create() + + def create(self) -> None: + """Create OMAP object if does not exist already""" + try: + # Create a new persistence OMAP object + with rados.WriteOpCtx() as write_op: + # Set exclusive parameter to fail write_op if object exists + write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(self.version),)) + self.ioctx.operate_write_op(write_op, self.name) + self.logger.info( + f"First gateway: created object {self.name}") + except rados.ObjectExists: + self.logger.info(f"{self.name} omap object already exists.") + except Exception: + self.logger.exception(f"Unable to create omap {self.name}:") + raise + + def __exit__(self, exc_type, exc_value, traceback) -> None: + """Context destructor""" + if self.watch is not None: + self.watch.close() + self.ioctx.close() + + def get(self) -> Dict[str, str]: + """Returns dict of all OMAP keys and values.""" + with rados.ReadOpCtx() as read_op: + i, _ = self.ioctx.get_omap_vals(read_op, "", "", -1) + self.ioctx.operate_read_op(read_op, self.name) + omap_dict = dict(i) + return omap_dict + + def _notify(self) -> None: + """ Notify other gateways within the group of change """ + try: + self.ioctx.notify(self.name) + except Exception as ex: + self.logger.info(f"Failed to notify.") + + def add_key(self, key: str, val: str) -> None: + """Adds key and value to the OMAP.""" + try: + version_update = self.version + 1 + with rados.WriteOpCtx() as write_op: + # Compare operation failure will cause write failure + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), + rados.LIBRADOS_CMPXATTR_OP_EQ) + self.ioctx.set_omap(write_op, (key,), (val,)) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(version_update),)) + self.ioctx.operate_write_op(write_op, self.name) + self.version = version_update + self.logger.debug(f"omap_key generated: {key}") + except Exception as ex: + self.logger.error(f"Unable to add key to omap: {ex}. Exiting!") + raise + + self._notify() + + def remove_key(self, key: str) -> None: + """Removes key from the OMAP.""" + try: + version_update = self.version + 1 + with rados.WriteOpCtx() as write_op: + # Compare operation failure will cause remove failure + write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), + rados.LIBRADOS_CMPXATTR_OP_EQ) + self.ioctx.remove_omap_keys(write_op, (key,)) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(version_update),)) + self.ioctx.operate_write_op(write_op, self.name) + self.version = version_update + self.logger.debug(f"omap_key removed: {key}") + except Exception: + self.logger.exception(f"Unable to remove key from omap:") + raise + + self._notify() + + def delete(self) -> None: + """Deletes OMAP object contents.""" + try: + with rados.WriteOpCtx() as write_op: + self.ioctx.clear_omap(write_op) + self.ioctx.operate_write_op(write_op, self.name) + self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), + (str(1),)) + self.ioctx.operate_write_op(write_op, self.name) + self.logger.info(f"Deleted OMAP {self.name} contents.") + except Exception: + self.logger.exception(f"Error deleting OMAP {self.name} contents:") + raise + + def register_watch(self, notify_event) -> None: + """Sets a watch on the OMAP object for changes.""" + + def _watcher_callback(notify_id, notifier_id, watch_id, data): + notify_event.set() + + if self.watch is None: + try: + self.watch = self.ioctx.watch(self.name, _watcher_callback) + except Exception: + self.logger.exception(f"Unable to initiate watch {self.name}:") + raise + else: + self.logger.info(f"Watch {self.name} already exists.") diff --git a/control/server.py b/control/server.py index bc740fc7..651fd90e 100644 --- a/control/server.py +++ b/control/server.py @@ -16,8 +16,8 @@ import json import logging import signal -import traceback from concurrent import futures +from typing import Dict from google.protobuf import json_format import spdk.rpc @@ -26,7 +26,7 @@ from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc -from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler +from .state import GatewayState, OmapGatewayState, GatewayStateHandler from .grpc import GatewayService def sigchld_handler(signum, frame): @@ -95,17 +95,17 @@ def serve(self): # Start SPDK self._start_spdk() - # Register service implementation with server + # Init OMAP state omap_state = OmapGatewayState(self.config) - local_state = LocalGatewayState() - gateway_state = GatewayStateHandler(self.config, local_state, - omap_state, self.gateway_rpc_caller) + gateway_state = GatewayStateHandler(self.config, omap_state, self.gateway_rpc_caller) + + # Register GRPC service implementation with server self.gateway_rpc = GatewayService(self.config, gateway_state, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) - # Add listener port + # Add GRPC listener port self._add_server_listener() # Check for existing NVMeoF target state @@ -288,7 +288,7 @@ def _ping(self): self.logger.error(f"spdk_get_version failed with: \n {ex}") return False - def gateway_rpc_caller(self, requests, is_add_req): + def gateway_rpc_caller(self, requests: Dict[str, str] , is_add_req: bool) -> None: """Passes RPC requests to gateway service.""" for key, val in requests.items(): if key.startswith(GatewayState.BDEV_PREFIX): diff --git a/control/state.py b/control/state.py index b99664c3..0d549cce 100644 --- a/control/state.py +++ b/control/state.py @@ -11,139 +11,42 @@ import threading import rados import logging -from typing import Dict from collections import defaultdict -from abc import ABC, abstractmethod +from typing import DefaultDict, Dict, List, Callable +from .omap import OmapObject +from .config import GatewayConfig -class GatewayState(ABC): - """Persists gateway NVMeoF target state. +# Declare a callback function called when the gateway state changes +StateUpdate = Callable[[Dict[str, str], bool], None] - Class attributes: +class GatewayState: + """ X_PREFIX: Key prefix for key of type "X" """ - BDEV_PREFIX = "bdev_" NAMESPACE_PREFIX = "namespace_" SUBSYSTEM_PREFIX = "subsystem_" HOST_PREFIX = "host_" LISTENER_PREFIX = "listener_" - @abstractmethod - def get_state(self) -> Dict[str, str]: - """Returns the state dictionary.""" - pass - - @abstractmethod - def _add_key(self, key: str, val: str): - """Adds key to state data store.""" - pass - - @abstractmethod - def _remove_key(self, key: str): - """Removes key from state data store.""" - pass - - def add_bdev(self, bdev_name: str, val: str): - """Adds a bdev to the state data store.""" - key = self.BDEV_PREFIX + bdev_name - self._add_key(key, val) - - def remove_bdev(self, bdev_name: str): - """Removes a bdev from the state data store.""" - key = self.BDEV_PREFIX + bdev_name - self._remove_key(key) - - def add_namespace(self, subsystem_nqn: str, nsid: str, val: str): - """Adds a namespace to the state data store.""" - key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + nsid - self._add_key(key, val) - - def remove_namespace(self, subsystem_nqn: str, nsid: str): - """Removes a namespace from the state data store.""" - key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + nsid - self._remove_key(key) - - def add_subsystem(self, subsystem_nqn: str, val: str): - """Adds a subsystem to the state data store.""" - key = self.SUBSYSTEM_PREFIX + subsystem_nqn - self._add_key(key, val) - - def remove_subsystem(self, subsystem_nqn: str): - """Removes a subsystem from the state data store.""" - key = self.SUBSYSTEM_PREFIX + subsystem_nqn - self._remove_key(key) - - # Delete all keys related to subsystem - state = self.get_state() - for key in state.keys(): - if (key.startswith(self.NAMESPACE_PREFIX + subsystem_nqn) or - key.startswith(self.HOST_PREFIX + subsystem_nqn) or - key.startswith(self.LISTENER_PREFIX + subsystem_nqn)): - self._remove_key(key) - - def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): - """Adds a host to the state data store.""" - key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) - self._add_key(key, val) - - def remove_host(self, subsystem_nqn: str, host_nqn: str): - """Removes a host from the state data store.""" - key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) - self._remove_key(key) - - def add_listener(self, subsystem_nqn: str, gateway: str, trtype: str, - traddr: str, trsvcid: str, val: str): - """Adds a listener to the state data store.""" - key = "{}{}_{}_{}_{}_{}".format(self.LISTENER_PREFIX, subsystem_nqn, - gateway, trtype, traddr, trsvcid) - self._add_key(key, val) - - def remove_listener(self, subsystem_nqn: str, gateway: str, trtype: str, - traddr: str, trsvcid: str): - """Removes a listener from the state data store.""" - key = "{}{}_{}_{}_{}_{}".format(self.LISTENER_PREFIX, subsystem_nqn, - gateway, trtype, traddr, trsvcid) - self._remove_key(key) - - @abstractmethod - def delete_state(self): - """Deletes state data store.""" - pass - - -class LocalGatewayState(GatewayState): - """Records gateway NVMeoF target state in a dictionary. +def bdev_key(bdev_name: str) -> str: + return f"{GatewayState.BDEV_PREFIX}{bdev_name}" - Instance attributes: - state: Local gateway NVMeoF target state - """ - - def __init__(self): - self.state = {} - - def get_state(self) -> Dict[str, str]: - """Returns local state dictionary.""" - return self.state.copy() - - def _add_key(self, key: str, val: str): - """Adds key and value to the local state dictionary.""" - self.state[key] = val - - def _remove_key(self, key: str): - """Removes key from the local state dictionary.""" - self.state.pop(key) +def namespace_key(subsystem_nqn: str, nsid: str) -> str: + return f"{GatewayState.NAMESPACE_PREFIX}{subsystem_nqn}_{nsid}" - def delete_state(self): - """Deletes contents of local state dictionary.""" - self.state.clear() +def subsystem_key(subsystem_nqn: str) -> str: + return f"{GatewayState.SUBSYSTEM_PREFIX}{subsystem_nqn}" - def reset(self, omap_state): - """Resets dictionary with OMAP state.""" - self.state = omap_state +def host_key(subsystem_nqn: str, host_nqn: str) -> str: + return f"{GatewayState.HOST_PREFIX}{subsystem_nqn}_{host_nqn}" +def listener_key(subsystem_nqn: str, gateway: str, trtype: str, + traddr: str, trsvcid: str) -> str: + return f"{GatewayState.LISTENER_PREFIX}{subsystem_nqn}_{gateway}_{trtype}_{traddr}_{trsvcid}" -class OmapGatewayState(GatewayState): +class OmapGatewayState: """Persists gateway NVMeoF target state to an OMAP object. Handles reads/writes of persistent NVMeoF target state data in key/value @@ -161,15 +64,11 @@ class OmapGatewayState(GatewayState): watch: Watcher for the OMAP object """ - OMAP_VERSION_KEY = "omap_version" - - def __init__(self, config): + def __init__(self, config: GatewayConfig) -> None: self.config = config - self.version = 1 self.logger = logging.getLogger(__name__) - self.watch = None gateway_group = self.config.get("gateway", "group") - self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state" + ceph_pool = self.config.get("ceph", "pool") ceph_conf = self.config.get("ceph", "config_file") rados_id = self.config.get_with_default("ceph", "id", "") @@ -178,134 +77,62 @@ def __init__(self, config): conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id) conn.connect() self.ioctx = conn.open_ioctx(ceph_pool) - # Create a new gateway persistence OMAP object - with rados.WriteOpCtx() as write_op: - # Set exclusive parameter to fail write_op if object exists - write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE) - self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), - (str(self.version),)) - self.ioctx.operate_write_op(write_op, self.omap_name) - self.logger.info( - f"First gateway: created object {self.omap_name}") - except rados.ObjectExists: - self.logger.info(f"{self.omap_name} omap object already exists.") - except Exception as ex: - self.logger.error(f"Unable to create omap: {ex}. Exiting!") + omap_state_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state" + self.state = OmapObject(omap_state_name, self.ioctx) + except Exception: + self.logger.exception(f"Unable to create omap:") raise - def __exit__(self, exc_type, exc_value, traceback): - if self.watch is not None: - self.watch.close() - self.ioctx.close() - - def get_local_version(self) -> int: - """Returns local version.""" - return self.version - - def set_local_version(self, version_update: int): - """Sets local version.""" - self.version = version_update - - def get_omap_version(self) -> int: - """Returns OMAP version.""" - with rados.ReadOpCtx() as read_op: - i, _ = self.ioctx.get_omap_vals_by_keys(read_op, - (self.OMAP_VERSION_KEY,)) - self.ioctx.operate_read_op(read_op, self.omap_name) - value_list = list(dict(i).values()) - if len(value_list) == 1: - val = int(value_list[0]) - return val - else: - self.logger.error( - f"Read of OMAP version key ({self.OMAP_VERSION_KEY}) returns" - f" invalid number of values ({value_list}).") - raise - def get_state(self) -> Dict[str, str]: - """Returns dict of all OMAP keys and values.""" - with rados.ReadOpCtx() as read_op: - i, _ = self.ioctx.get_omap_vals(read_op, "", "", -1) - self.ioctx.operate_read_op(read_op, self.omap_name) - omap_dict = dict(i) - return omap_dict + def add_bdev(self, bdev_name: str, val: str) -> None: + """Adds a bdev to the state data store.""" + self.state.add_key(bdev_key(bdev_name), val) + + def remove_bdev(self, bdev_name: str) -> None: + """Removes a bdev from the state data store.""" + self.state.remove_key(bdev_key(bdev_name)) - def _add_key(self, key: str, val: str): - """Adds key and value to the OMAP.""" - try: - version_update = self.version + 1 - with rados.WriteOpCtx() as write_op: - # Compare operation failure will cause write failure - write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), - rados.LIBRADOS_CMPXATTR_OP_EQ) - self.ioctx.set_omap(write_op, (key,), (val,)) - self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), - (str(version_update),)) - self.ioctx.operate_write_op(write_op, self.omap_name) - self.version = version_update - self.logger.debug(f"omap_key generated: {key}") - except Exception as ex: - self.logger.error(f"Unable to add key to omap: {ex}. Exiting!") - raise + def add_namespace(self, subsystem_nqn: str, nsid: str, val: str) -> None: + """Adds a namespace to the state data store.""" + self.state.add_key(namespace_key(subsystem_nqn, nsid), val) - # Notify other gateways within the group of change - try: - self.ioctx.notify(self.omap_name) - except Exception as ex: - self.logger.info(f"Failed to notify.") + def remove_namespace(self, subsystem_nqn: str, nsid: str) -> None: + """Removes a namespace from the state data store.""" + self.state.remove_key(namespace_key(subsystem_nqn, nsid)) - def _remove_key(self, key: str): - """Removes key from the OMAP.""" - try: - version_update = self.version + 1 - with rados.WriteOpCtx() as write_op: - # Compare operation failure will cause remove failure - write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version), - rados.LIBRADOS_CMPXATTR_OP_EQ) - self.ioctx.remove_omap_keys(write_op, (key,)) - self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), - (str(version_update),)) - self.ioctx.operate_write_op(write_op, self.omap_name) - self.version = version_update - self.logger.debug(f"omap_key removed: {key}") - except Exception as ex: - self.logger.error(f"Unable to remove key from omap: {ex}. Exiting!") - raise + def add_subsystem(self, subsystem_nqn: str, val: str) -> None: + """Adds a subsystem to the state data store.""" + self.state.add_key(subsystem_key(subsystem_nqn), val) - # Notify other gateways within the group of change - try: - self.ioctx.notify(self.omap_name) - except Exception as ex: - self.logger.info(f"Failed to notify.") + def remove_subsystem(self, subsystem_nqn: str) -> None: + """Removes a subsystem from the state data store.""" + self.state.remove_key(subsystem_key(subsystem_nqn)) - def delete_state(self): - """Deletes OMAP object contents.""" - try: - with rados.WriteOpCtx() as write_op: - self.ioctx.clear_omap(write_op) - self.ioctx.operate_write_op(write_op, self.omap_name) - self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,), - (str(1),)) - self.ioctx.operate_write_op(write_op, self.omap_name) - self.logger.info(f"Deleted OMAP contents.") - except Exception as ex: - self.logger.error(f"Error deleting OMAP contents: {ex}. Exiting!") - raise + # Delete all keys related to subsystem + state = self.state.get() + for key in state.keys(): + if (key.startswith(GatewayState.NAMESPACE_PREFIX + subsystem_nqn) or + key.startswith(GatewayState.HOST_PREFIX + subsystem_nqn) or + key.startswith(GatewayState.LISTENER_PREFIX + subsystem_nqn)): + self.state.remove_key(key) - def register_watch(self, notify_event): - """Sets a watch on the OMAP object for changes.""" + def add_host(self, subsystem_nqn: str, host_nqn: str, val: str) -> None: + """Adds a host to the state data store.""" + self.state.add_key(host_key(subsystem_nqn, host_nqn), val) - def _watcher_callback(notify_id, notifier_id, watch_id, data): - notify_event.set() + def remove_host(self, subsystem_nqn: str, host_nqn: str) -> None: + """Removes a host from the state data store.""" + self.state.remove_key(host_key(subsystem_nqn, host_nqn)) - if self.watch is None: - try: - self.watch = self.ioctx.watch(self.omap_name, _watcher_callback) - except Exception as ex: - self.logger.error(f"Unable to initiate watch: {ex}") - else: - self.logger.info(f"Watch already exists.") + def add_listener(self, subsystem_nqn: str, gateway: str, trtype: str, + traddr: str, trsvcid: str, val: str) -> None: + """Adds a listener to the state data store.""" + self.state.add_key(listener_key(subsystem_nqn, gateway, trtype, traddr, trsvcid), val) + def remove_listener(self, subsystem_nqn: str, gateway: str, trtype: str, + traddr: str, trsvcid: str) -> None: + """Removes a listener from the state data store.""" + self.state.remove_key(listener_key(subsystem_nqn, gateway, trtype, traddr, trsvcid)) class GatewayStateHandler: """Maintains consistency in NVMeoF target state store instances. @@ -314,16 +141,16 @@ class GatewayStateHandler: config: Basic gateway parameters logger: Logger instance to track events local: Local GatewayState instance - gateway_rpc_caller: Callback to GatewayServer.gateway_rpc_caller + gateway_rpc_caller: StateUpdate callback, implemented by GatewayServer omap: OMAP GatewayState instance update_interval: Interval to periodically poll for updates update_timer: Timer to check for gateway state updates use_notify: Flag to indicate use of OMAP watch/notify """ - def __init__(self, config, local, omap, gateway_rpc_caller): + def __init__(self, config: GatewayConfig, omap: OmapGatewayState, + gateway_rpc_caller: StateUpdate) -> None: self.config = config - self.local = local self.omap = omap self.gateway_rpc_caller = gateway_rpc_caller self.update_timer = None @@ -336,73 +163,12 @@ def __init__(self, config, local, omap, gateway_rpc_caller): self.use_notify = self.config.getboolean("gateway", "state_update_notify") - def add_bdev(self, bdev_name: str, val: str): - """Adds a bdev to the state data stores.""" - self.omap.add_bdev(bdev_name, val) - self.local.add_bdev(bdev_name, val) - - def remove_bdev(self, bdev_name: str): - """Removes a bdev from the state data stores.""" - self.omap.remove_bdev(bdev_name) - self.local.remove_bdev(bdev_name) - - def add_namespace(self, subsystem_nqn: str, nsid: str, val: str): - """Adds a namespace to the state data store.""" - self.omap.add_namespace(subsystem_nqn, nsid, val) - self.local.add_namespace(subsystem_nqn, nsid, val) - - def remove_namespace(self, subsystem_nqn: str, nsid: str): - """Removes a namespace from the state data store.""" - self.omap.remove_namespace(subsystem_nqn, nsid) - self.local.remove_namespace(subsystem_nqn, nsid) - - def add_subsystem(self, subsystem_nqn: str, val: str): - """Adds a subsystem to the state data store.""" - self.omap.add_subsystem(subsystem_nqn, val) - self.local.add_subsystem(subsystem_nqn, val) - - def remove_subsystem(self, subsystem_nqn: str): - """Removes a subsystem from the state data store.""" - self.omap.remove_subsystem(subsystem_nqn) - self.local.remove_subsystem(subsystem_nqn) - - def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): - """Adds a host to the state data store.""" - self.omap.add_host(subsystem_nqn, host_nqn, val) - self.local.add_host(subsystem_nqn, host_nqn, val) - - def remove_host(self, subsystem_nqn: str, host_nqn: str): - """Removes a host from the state data store.""" - self.omap.remove_host(subsystem_nqn, host_nqn) - self.local.remove_host(subsystem_nqn, host_nqn) - - def add_listener(self, subsystem_nqn: str, gateway: str, trtype: str, - traddr: str, trsvcid: str, val: str): - """Adds a listener to the state data store.""" - self.omap.add_listener(subsystem_nqn, gateway, trtype, traddr, trsvcid, - val) - self.local.add_listener(subsystem_nqn, gateway, trtype, traddr, trsvcid, - val) - - def remove_listener(self, subsystem_nqn: str, gateway: str, trtype: str, - traddr: str, trsvcid: str): - """Removes a listener from the state data store.""" - self.omap.remove_listener(subsystem_nqn, gateway, trtype, traddr, - trsvcid) - self.local.remove_listener(subsystem_nqn, gateway, trtype, traddr, - trsvcid) - - def delete_state(self): - """Deletes state data stores.""" - self.omap.delete_state() - self.local.delete_state() - - def start_update(self): + def start_update(self) -> None: """Initiates periodic polling and watch/notify for updates.""" notify_event = threading.Event() if self.use_notify: # Register a watch on omap state - self.omap.register_watch(notify_event) + self.omap.state.register_watch(notify_event) # Start polling for state updates if self.update_timer is None: @@ -413,7 +179,7 @@ def start_update(self): else: self.logger.info("Update timer already set.") - def _update_caller(self, notify_event): + def _update_caller(self, notify_event: threading.Event) -> None: """Periodically calls for update.""" while True: update_time = time.time() + self.update_interval @@ -421,7 +187,7 @@ def _update_caller(self, notify_event): notify_event.wait(max(update_time - time.time(), 0)) notify_event.clear() - def update(self): + def update(self) -> None: """Checks for updated omap state and initiates local update.""" prefix_list = [ GatewayState.BDEV_PREFIX, GatewayState.SUBSYSTEM_PREFIX, @@ -430,11 +196,11 @@ def update(self): ] # Get version and state from OMAP - omap_state_dict = self.omap.get_state() - omap_version = int(omap_state_dict[self.omap.OMAP_VERSION_KEY]) + omap_state_dict = self.omap.state.get() + omap_version = int(omap_state_dict[OmapObject.OMAP_VERSION_KEY]) - if self.omap.get_local_version() < omap_version: - local_state_dict = self.local.get_state() + if self.omap.state.version < omap_version: + local_state_dict = self.omap.state.cached_object local_state_keys = local_state_dict.keys() omap_state_keys = omap_state_dict.keys() @@ -465,11 +231,11 @@ def update(self): self._update_call_rpc(grouped_added, True, prefix_list) # Update local state and version - self.local.reset(omap_state_dict) - self.omap.set_local_version(omap_version) + self.omap.state.cached_object = omap_state_dict.copy() + self.omap.state.version = omap_version self.logger.debug("Update complete.") - def _group_by_prefix(self, state_update, prefix_list): + def _group_by_prefix(self, state_update: Dict[str, str], prefix_list: List[str]) -> DefaultDict[str, Dict[str, str]] : """Groups state update by key prefixes.""" grouped_state_update = defaultdict(dict) for key, val in state_update.items(): @@ -478,7 +244,8 @@ def _group_by_prefix(self, state_update, prefix_list): grouped_state_update[prefix][key] = val return grouped_state_update - def _update_call_rpc(self, grouped_state_update, is_add_req, prefix_list): + def _update_call_rpc(self, grouped_state_update: DefaultDict[str, Dict[str, str]], + is_add_req: bool, prefix_list: List[str]) -> None: """Calls to initiate gateway RPCs in necessary component order.""" if is_add_req: for prefix in prefix_list: diff --git a/tests/test_cli.py b/tests/test_cli.py index 853d75a6..7234da6e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -30,7 +30,7 @@ def gateway(config): # Stop gateway gateway.server.stop(grace=1) - gateway.gateway_rpc.gateway_state.delete_state() + gateway.gateway_rpc.gateway_state.omap.state.delete() class TestGet: def test_get_subsystems(self, caplog, gateway): diff --git a/tests/test_multi_gateway.py b/tests/test_multi_gateway.py index 8b57d6ec..23e02181 100644 --- a/tests/test_multi_gateway.py +++ b/tests/test_multi_gateway.py @@ -40,7 +40,7 @@ def conn(config): ): gatewayA.serve() # Delete existing OMAP state - gatewayA.gateway_rpc.gateway_state.delete_state() + gatewayA.gateway_rpc.gateway_state.omap.state.delete() # Create new gatewayB.serve() @@ -54,7 +54,7 @@ def conn(config): # Stop gateways gatewayA.server.stop(grace=1) gatewayB.server.stop(grace=1) - gatewayB.gateway_rpc.gateway_state.delete_state() + gatewayB.gateway_rpc.gateway_state.omap.state.delete() def test_multi_gateway_coordination(config, image, conn): """Tests state coordination in a gateway group. diff --git a/tests/test_state.py b/tests/test_state.py index 9f7daf3e..2d8ec87a 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -1,11 +1,12 @@ import pytest import time import rados -from control.state import LocalGatewayState, OmapGatewayState, GatewayStateHandler +from control.state import OmapGatewayState, GatewayStateHandler +from control.config import GatewayConfig @pytest.fixture(scope="module") -def ioctx(config): +def ioctx(config: GatewayConfig): """Opens IO context to ceph pool.""" ceph_pool = config.get("ceph", "pool") ceph_conf = config.get("ceph", "config_file") @@ -16,22 +17,17 @@ def ioctx(config): ioctx.close() -@pytest.fixture -def local_state(): - """Returns local state object.""" - return LocalGatewayState() - @pytest.fixture -def omap_state(config): +def omap_state(config: GatewayConfig): """Sets up and tears down OMAP state object.""" omap = OmapGatewayState(config) - omap.delete_state() + omap.state.delete() yield omap - omap.delete_state() + omap.state.delete() -def add_key(ioctx, key, value, version, omap_name, omap_version_key): +def add_key(ioctx: rados.Ioctx, key: str, value: str, version: int, omap_name: str, omap_version_key: str): """Adds key to the specified OMAP and sets version number.""" with rados.WriteOpCtx() as write_op: ioctx.set_omap(write_op, (key,), (value,)) @@ -39,7 +35,7 @@ def add_key(ioctx, key, value, version, omap_name, omap_version_key): ioctx.operate_write_op(write_op, omap_name) -def remove_key(ioctx, key, version, omap_name, omap_version_key): +def remove_key(ioctx: rados.Ioctx, key: str, version: int, omap_name: str, omap_version_key: str): """Removes key from the specified OMAP.""" with rados.WriteOpCtx() as write_op: ioctx.remove_omap_keys(write_op, (key,)) @@ -47,7 +43,7 @@ def remove_key(ioctx, key, version, omap_name, omap_version_key): ioctx.operate_write_op(write_op, omap_name) -def test_state_polling_update(config, ioctx, local_state, omap_state): +def test_state_polling_update(config: GatewayConfig, ioctx: rados.Ioctx, omap_state: OmapGatewayState): """Confirms periodic polling of the OMAP for updates.""" update_counter = 0 @@ -78,35 +74,36 @@ def _state_polling_update(update, is_add_req): version = 1 update_interval_sec = 1 - state = GatewayStateHandler(config, local_state, omap_state, + state_handler = GatewayStateHandler(config, omap_state, _state_polling_update) - state.update_interval = update_interval_sec - state.use_notify = False + state_handler.update_interval = update_interval_sec + state_handler.use_notify = False key = "bdev_test" - state.start_update() + state_handler.start_update() + omap_obj = omap_state.state # Add bdev key to OMAP and update version number version += 1 - add_key(ioctx, key, "add", version, omap_state.omap_name, - omap_state.OMAP_VERSION_KEY) + add_key(ioctx, key, "add", version, omap_obj.name, + omap_obj.OMAP_VERSION_KEY) time.sleep(update_interval_sec + 1) # Allow time for polling # Change bdev key and update version number version += 1 - add_key(ioctx, key, "changed", version, omap_state.omap_name, - omap_state.OMAP_VERSION_KEY) + add_key(ioctx, key, "changed", version, omap_obj.name, + omap_obj.OMAP_VERSION_KEY) time.sleep(update_interval_sec + 1) # Allow time for polling # Remove bdev key and update version number version += 1 - remove_key(ioctx, key, version, omap_state.omap_name, - omap_state.OMAP_VERSION_KEY) + remove_key(ioctx, key, version, omap_obj.name, + omap_obj.OMAP_VERSION_KEY) time.sleep(update_interval_sec + 1) # Allow time for polling assert update_counter == 4 -def test_state_notify_update(config, ioctx, local_state, omap_state): +def test_state_notify_update(config: GatewayConfig, ioctx: rados.Ioctx, omap_state: OmapGatewayState): """Confirms use of OMAP watch/notify for updates.""" update_counter = 0 @@ -139,31 +136,32 @@ def _state_notify_update(update, is_add_req): version = 1 update_interval_sec = 10 - state = GatewayStateHandler(config, local_state, omap_state, + state_handler = GatewayStateHandler(config, omap_state, _state_notify_update) key = "bdev_test" - state.update_interval = update_interval_sec - state.use_notify = True + state_handler.update_interval = update_interval_sec + state_handler.use_notify = True start = time.time() - state.start_update() + state_handler.start_update() + omap_obj = omap_state.state # Add bdev key to OMAP and update version number version += 1 - add_key(ioctx, key, "add", version, omap_state.omap_name, - omap_state.OMAP_VERSION_KEY) - assert (ioctx.notify(omap_state.omap_name)) # Send notify signal + add_key(ioctx, key, "add", version, omap_obj.name, + omap_obj.OMAP_VERSION_KEY) + assert (ioctx.notify(omap_obj.name)) # Send notify signal # Change bdev key and update version number version += 1 - add_key(ioctx, key, "changed", version, omap_state.omap_name, - omap_state.OMAP_VERSION_KEY) - assert (ioctx.notify(omap_state.omap_name)) # Send notify signal + add_key(ioctx, key, "changed", version, omap_obj.name, + omap_obj.OMAP_VERSION_KEY) + assert (ioctx.notify(omap_obj.name)) # Send notify signal # Remove bdev key and update version number version += 1 - remove_key(ioctx, key, version, omap_state.omap_name, - omap_state.OMAP_VERSION_KEY) - assert (ioctx.notify(omap_state.omap_name)) # Send notify signal + remove_key(ioctx, key, version, omap_obj.name, + omap_obj.OMAP_VERSION_KEY) + assert (ioctx.notify(omap_obj.name)) # Send notify signal # any wait interval smaller than update_interval_sec = 10 should be good # to test notify capability