Skip to content

Commit

Permalink
Merge pull request #445 from leonidc/leo_devel
Browse files Browse the repository at this point in the history
recycling of namespaces, bdevs and clusters for blocklisted ana group
  • Loading branch information
leonidc authored Mar 4, 2024
2 parents 5125d1b + 805c6eb commit bfea178
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9"

# Ceph Cluster
CEPH_CLUSTER_VERSION="${CEPH_VERSION}"
CEPH_BRANCH=wip-leonidc-integration-fix-ack-map1
CEPH_SHA=2138cd88784605a17e5797e3bfd3a46f7f8b19db
CEPH_BRANCH=ceph-nvmeof-mon
CEPH_SHA=8fe25aced9ea078a1e4dcd3d2ca8866ce00d3028
CEPH_VSTART_ARGS="--memstore"
CEPH_DEVEL_MGR_PATH=../ceph

Expand Down
78 changes: 73 additions & 5 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
import errno
import contextlib
import time
from typing import Callable
from collections import defaultdict

Expand Down Expand Up @@ -157,6 +158,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, om
self.ana_map = defaultdict(dict)
self.cluster_nonce = {}
self.bdev_cluster = {}
self.bdev_params = {}
self.subsystem_nsid_bdev = defaultdict(dict)
self.subsystem_nsid_anagrp = defaultdict(dict)
self.gateway_group = self.config.get("gateway", "group")
Expand Down Expand Up @@ -263,7 +265,7 @@ def _alloc_cluster(self, anagrp: int) -> str:
user = self.rados_id,
core_mask = self.librbd_core_mask,
)
self.logger.info(f"Allocated cluster {name=} {nonce=}")
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
return name

Expand Down Expand Up @@ -323,7 +325,9 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl
uuid=uuid,
)
self.bdev_cluster[name] = cluster_name
self.logger.info(f"bdev_rbd_create: {bdev_name}")
self.bdev_params[name] = {'uuid':uuid, 'pool_name':rbd_pool_name, 'image_name':rbd_image_name, 'image_size':rbd_image_size, 'block_size': block_size}

self.logger.info(f"bdev_rbd_create: {bdev_name}, cluster_name {cluster_name}")
except Exception as ex:
errmsg = f"bdev_rbd_create {name} failed"
self.logger.exception(errmsg)
Expand Down Expand Up @@ -376,7 +380,7 @@ def resize_bdev(self, bdev_name, new_size):

return pb2.req_status(status=0, error_message=os.strerror(0))

def delete_bdev(self, bdev_name):
def delete_bdev(self, bdev_name, recycling_mode=False):
"""Deletes a bdev."""

if not self.rpc_lock.locked():
Expand All @@ -389,6 +393,8 @@ def delete_bdev(self, bdev_name):
self.spdk_rpc_client,
bdev_name,
)
if not recycling_mode:
del self.bdev_params[bdev_name]
self._put_cluster(self.bdev_cluster[bdev_name])
self.logger.info(f"delete_bdev {bdev_name}: {ret}")
except Exception as ex:
Expand Down Expand Up @@ -750,7 +756,7 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
self.logger.info(f"Received request to set ana states {ana_info.states}")

state = self.gateway_state.local.get_state()

ana_dict = {}
# Iterate over nqn_ana_states in ana_info
for nas in ana_info.states:
nqn = nas.nqn
Expand Down Expand Up @@ -785,6 +791,8 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
adrfam=listener['adrfam'],
ana_state=ana_state,
anagrpid=grp_id)
if ana_state == "inaccessible" :
ana_dict[grp_id] = True;
self.logger.info(f"set_ana_state nvmf_subsystem_listener_set_ana_state response {ret=}")
if not ret:
raise Exception(f"nvmf_subsystem_listener_set_ana_state({nqn=}, {listener=}, {ana_state=}, {grp_id=}) error")
Expand All @@ -794,7 +802,12 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"{ex}")
return pb2.req_status()

for ana_key in ana_dict :
ret_recycle = self.namespace_recycle_safe(ana_key);
if ret_recycle != 0:
errmsg = f"Failure recycle namespaces of ana group {ana_key} "
self.logger.error(errmsg)
return pb2.req_status(status=ret_recycle , error_message=errmsg)
return pb2.req_status(status=True)

def namespace_add_safe(self, request, context):
Expand Down Expand Up @@ -1442,6 +1455,61 @@ def namespace_resize(self, request, context=None):

return pb2.req_status(status=ret.status, error_message=errmsg)

def namespace_recycle_safe(self, ana_id) ->int:
"""Recycle namespaces."""
now = time.time()
self.logger.info(f"== recycle_safe started == for anagrp{ana_id} time {now} ")

self.logger.info(f"Doing loop on {ana_id} map; subsystem_nsid_anagrp:")
list_ns_params = []
for subsys, inner_dict in self.subsystem_nsid_anagrp.items():
for ns_key, ana_value in inner_dict.items():
self.logger.info(f"nsid: {ns_key} ana_val: {ana_value}")
if ana_value == ana_id :
self.logger.info(f"nsid {ns_key} for nqn {subsys} to recycle:")
nsid = ns_key
bdev_name = self.subsystem_nsid_bdev[subsys][nsid]
assert bdev_name
ns_params = {'nsid':nsid, 'bdev_name':bdev_name, 'subsys':subsys}
list_ns_params.append(ns_params)
self.logger.info(f"nsid :{nsid}, pool_name: {self.bdev_params[bdev_name]['pool_name']}, rbd_name: {self.bdev_params[bdev_name]['image_name']}, block_size: {self.bdev_params[bdev_name]['block_size']}, uuid:{self.bdev_params[bdev_name]['uuid']}, anagrpid:{ana_id}")

ret = self.remove_namespace(subsys, nsid, None)
if ret.status != 0:
return -1
ret_del = self.delete_bdev(bdev_name, True)
if ret_del.status != 0:
errmsg = f"Failure deleting namespace {nsid} from {subsys}: {ret_del.error_message}"
self.logger.error(errmsg)
return -1
# recreate: loop on the list of dict 'list_ns_params'
for ns_params in list_ns_params:
bdev_name = ns_params['bdev_name']
self.logger.info(f" Recreate nsid: {ns_params['nsid']} ")
self.logger.info(f"ns params: {ns_params} ")
ret_bdev = self.create_bdev( ana_id, bdev_name, self.bdev_params[bdev_name]['uuid'], self.bdev_params[bdev_name]['pool_name'],
self.bdev_params[bdev_name]['image_name'], self.bdev_params[bdev_name]['block_size'], False,
self.bdev_params[bdev_name]['image_size'])
self.logger.info(f"bdev_rbd_create: {bdev_name}")
if ret_bdev.status != 0:
errmsg = f"Failure adding bdev {bdev_name} "
self.logger.error(errmsg)
return -1
# recreate namespace
ret_ns = self.create_namespace(ns_params['subsys'], bdev_name, ns_params['nsid'], ana_id, self.bdev_params[bdev_name]['uuid'], None)
if ret_ns.status != 0:
try:
ret_del = self.delete_bdev(bdev_name)
if ret_del.status != 0:
self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}")
except Exception:
self.logger.exception(f"Got exception while trying to delete bdev {bdev_name}:")
errmsg = f"Failure adding namespace {ns_params['nsid']} to {ns_params['subsys']}:{ret_ns.error_message}"
self.logger.error(errmsg)
return -1
self.logger.info(f"== recycle_safe completed == for anagrp{ana_id}; latency time {time.time() - now} sec")
return 0

def namespace_delete_safe(self, request, context):
"""Delete a namespace."""

Expand Down

0 comments on commit bfea178

Please sign in to comment.