Skip to content

Commit

Permalink
Do not process gRPC calls when gateway is going down.
Browse files Browse the repository at this point in the history
Fixes #992

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Dec 22, 2024
1 parent 719a6ee commit b1cc0af
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,5 @@ DHCHAP_KEY6="DHHC-1:01:Bu4tZd7X2oW7XxmVH5tGCdoS30pDX6bZvexHYoudeVlJW9yz:"
DHCHAP_KEY7="DHHC-1:01:JPJkDQ2po2FfLmKYlTF/sJ2HzVO/FKWxgXKE/H6XfL8ogQ1T:"
DHCHAP_KEY8="DHHC-1:01:e0B0vDxKleDzYVtG42xqFvoWZfiufkoywmfRKrETzayRdf1j:"
DHCHAP_KEY9="DHHC-1:01:KD+sfH3/o2bRQoV0ESjBUywQlMnSaYpZISUbVa0k0nsWpNST:"
DHCHAP_KEY10="DHHC-1:00:rWf0ZFYO7IgWGttM8w6jUrAY4cTQyqyXPdmxHeOSve3w5QU9:"
DHCHAP_KEY11="DHHC-1:02:j3uUz05r5aQy42vX4tDXqVf9HgUPPdEp3kXTgUWl9EphsG7jwpr9KSIt3bmRLXBijPTIDQ==:"
2 changes: 1 addition & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ max_ns_to_change_lb_grp = 8
#verify_nqns = True
#allowed_consecutive_spdk_ping_failures = 1
#spdk_ping_interval_in_seconds = 2.0
#max_hosts_per_namespace = 1
#max_hosts_per_namespace = 8
#max_namespaces_with_netmask = 1000
#max_subsystems = 128
#max_namespaces = 1024
Expand Down
14 changes: 11 additions & 3 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.host_name = socket.gethostname()
self.verify_nqns = self.config.getboolean_with_default("gateway", "verify_nqns", True)
self.gateway_group = self.config.get_with_default("gateway", "group", "")
self.max_hosts_per_namespace = self.config.getint_with_default("gateway", "max_hosts_per_namespace", 1)
self.max_hosts_per_namespace = self.config.getint_with_default("gateway", "max_hosts_per_namespace", 8)
self.max_namespaces_with_netmask = self.config.getint_with_default("gateway", "max_namespaces_with_netmask", 1000)
self.max_subsystems = self.config.getint_with_default("gateway", "max_subsystems", GatewayService.MAX_SUBSYSTEMS_DEFAULT)
self.max_namespaces = self.config.getint_with_default("gateway", "max_namespaces", GatewayService.MAX_NAMESPACES_DEFAULT)
Expand All @@ -386,6 +386,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self._init_cluster_context()
self.subsys_max_ns = {}
self.host_info = SubsystemHostAuth()
self.up_and_running = True
self.rebalance = Rebalance(self)

def get_directories_for_key_file(self, key_type : str, subsysnqn : str, create_dir : bool = False) -> []:
Expand Down Expand Up @@ -668,6 +669,12 @@ def execute_grpc_function(self, func, request, context):
called might take OMAP lock internally, however does NOT ensure
taking OMAP lock in any way.
"""

if not self.up_and_running:
errmsg = "Gateway is going down"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ESHUTDOWN, error_message=errmsg)

return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context)

def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, context, peer_msg = ""):
Expand Down Expand Up @@ -988,7 +995,7 @@ def create_subsystem_safe(self, request, context):
else:
subsys_using_serial = self.serial_number_already_used(context, request.serial_number)
if subsys_using_serial:
errmsg = f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"
errmsg = f"Serial number {request.serial_number} is already used by subsystem {subsys_using_serial}"
if subsys_already_exists or subsys_using_serial:
errmsg = f"{create_subsystem_error_prefix}: {errmsg}"
self.logger.error(f"{errmsg}")
Expand Down Expand Up @@ -1527,7 +1534,8 @@ def namespace_change_load_balancing_group_safe(self, request, context):
grps_list = []
peer_msg = self.get_peer_message(context)
change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.info(f"Received auto {request.auto_lb_logic} request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")
auto_lb_msg = "auto" if request.auto_lb_logic else "manual"
self.logger.info(f"Received {auto_lb_msg} request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

if not request.subsystem_nqn:
errmsg = f"Failure changing load balancing group for namespace, missing subsystem NQN"
Expand Down
2 changes: 1 addition & 1 deletion control/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(self, spdk_rpc_client, config, gateway_rpc):
self.gw_config = config
_bdev_pools = config.get_with_default('gateway', 'prometheus_bdev_pools', '')
self.bdev_pools = _bdev_pools.split(',') if _bdev_pools else []
self.interval = config.getint_with_default('gateway', 'prometheus_stats_inteval', 10)
self.interval = config.getint_with_default('gateway', 'prometheus_stats_interval', 10)
self.lock = threading.Lock()
self.hostname = os.getenv('NODE_NAME') or os.getenv('HOSTNAME')

Expand Down
2 changes: 2 additions & 0 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def __enter__(self):

def __exit__(self, exc_type, exc_value, traceback):
"""Cleans up SPDK and server instances."""
if self.gateway_rpc:
self.gateway_rpc.up_and_running = False
if exc_type is not None:
self.logger.exception("GatewayServer exception occurred:")
else:
Expand Down
2 changes: 1 addition & 1 deletion control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def cleanup_omap(self, omap_lock = None):
if omap_lock and omap_lock.omap_file_lock_duration > 0:
try:
omap_lock.unlock_omap()
except Exceprion:
except Exception:
pass
if self.ioctx:
try:
Expand Down
5 changes: 4 additions & 1 deletion tests/ceph-nvmeof.no-huge.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ state_update_notify = True
state_update_timeout_in_msec = 2000
state_update_interval_sec = 5
enable_spdk_discovery_controller = False
rebalance_period_sec = 7
max_gws_in_grp = 16
max_ns_to_change_lb_grp = 8
#omap_file_lock_duration = 20
#omap_file_lock_retries = 30
#omap_file_lock_retry_sleep_interval = 1.0
Expand All @@ -29,7 +32,7 @@ enable_spdk_discovery_controller = False
#verify_nqns = True
#allowed_consecutive_spdk_ping_failures = 1
#spdk_ping_interval_in_seconds = 2.0
#max_hosts_per_namespace = 1
#max_hosts_per_namespace = 8
#max_namespaces_with_netmask = 1000
#max_subsystems = 128
#max_namespaces = 1024
Expand Down
5 changes: 4 additions & 1 deletion tests/ceph-nvmeof.tls.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ enable_auth = True
state_update_notify = True
state_update_interval_sec = 5
enable_spdk_discovery_controller = False
rebalance_period_sec = 7
max_gws_in_grp = 16
max_ns_to_change_lb_grp = 8
#omap_file_lock_duration = 20
#omap_file_lock_retries = 30
#omap_file_lock_retry_sleep_interval = 1.0
Expand All @@ -28,7 +31,7 @@ enable_spdk_discovery_controller = False
#verify_nqns = True
#allowed_consecutive_spdk_ping_failures = 1
#spdk_ping_interval_in_seconds = 2.0
#max_hosts_per_namespace = 1
#max_hosts_per_namespace = 8
#max_namespaces_with_netmask = 1000
#max_subsystems = 128
#max_namespaces = 1024
Expand Down
8 changes: 8 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
subsystem5 = "nqn.2016-06.io.spdk:cnode5"
subsystem6 = "nqn.2016-06.io.spdk:cnode6"
subsystem7 = "nqn.2016-06.io.spdk:cnode7"
subsystem8 = "nqn.2016-06.io.spdk:cnode8"
discovery_nqn = "nqn.2014-08.org.nvmexpress.discovery"
serial = "Ceph00000000000001"
uuid = "948878ee-c3b2-4d58-a29b-2cff713fc02d"
Expand Down Expand Up @@ -70,6 +71,7 @@ def gateway(config):
port = config.getint("gateway", "port")
config.config["gateway"]["group"] = group_name
config.config["gateway"]["max_namespaces_with_netmask"] = "3"
config.config["gateway"]["max_hosts_per_namespace"] = "1"
config.config["gateway"]["max_subsystems"] = "3"
config.config["gateway"]["max_namespaces"] = "12"
config.config["gateway"]["max_namespaces_per_subsystem"] = "11"
Expand Down Expand Up @@ -212,6 +214,9 @@ def test_create_subsystem(self, caplog, gateway):
assert f'"nqn": "{subsystem}"' in caplog.text
assert f'"max_namespaces": 2049' in caplog.text
caplog.clear()
cli(["subsystem", "add", "--subsystem", subsystem, "--max-namespaces", "2049", "--no-group-append"])
assert f"Failure creating subsystem {subsystem}: Subsystem already exists" in caplog.text
caplog.clear()
cli(["subsystem", "add", "--subsystem", subsystem2, "--serial-number", serial, "--no-group-append"])
assert f"Adding subsystem {subsystem2}: Successful" in caplog.text
caplog.clear()
Expand Down Expand Up @@ -251,6 +256,9 @@ def test_create_subsystem(self, caplog, gateway):
assert subs_list.subsystems[0].nqn == subsystem
assert subs_list.subsystems[1].nqn == subsystem2
caplog.clear()
cli(["subsystem", "add", "--subsystem", subsystem8, "--serial-number", serial, "--no-group-append"])
assert f"Failure creating subsystem {subsystem8}: Serial number {serial} is already used by subsystem {subsystem2}" in caplog.text
caplog.clear()
subs_list = cli_test(["subsystem", "list"])
assert subs_list != None
assert subs_list.status == 0
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cli_change_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def change_one_namespace_lb_group(caplog, subsys, nsid_to_change, new_group):
time.sleep(8)

assert f"Changing load balancing group of namespace {nsid_to_change} in {subsys} to {new_group}: Successful" in caplog.text
assert f"Received auto False request to change load balancing group for namespace with NSID {nsid_to_change} in {subsys} to {new_group}, context: <grpc._server" in caplog.text
assert f"Received manual request to change load balancing group for namespace with NSID {nsid_to_change} in {subsys} to {new_group}, context: <grpc._server" in caplog.text
assert f"Received request to delete namespace" not in caplog.text
assert f"Received request to add a namespace" not in caplog.text
assert f"Received auto False request to change load balancing group for namespace with NSID {nsid_to_change} in {subsys} to {new_group}, context: None" in caplog.text
assert f"Received manual request to change load balancing group for namespace with NSID {nsid_to_change} in {subsys} to {new_group}, context: None" in caplog.text

def switch_namespaces_lb_group(caplog, ns_count, subsys):
for i in range(1, 1 + (ns_count // 2)):
Expand Down

0 comments on commit b1cc0af

Please sign in to comment.