Skip to content

Commit

Permalink
Merge branch 'ceph:devel' into push_nvmeof_images_also_latest_tag
Browse files Browse the repository at this point in the history
  • Loading branch information
barakda authored Nov 5, 2023
2 parents 5fe0e04 + a360078 commit 5a2d1fd
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 53 deletions.
11 changes: 6 additions & 5 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ NVMEOF_DESCRIPTION="Service to provide block storage on top of Ceph for platform
NVMEOF_URL="https://github.com/ceph/ceph-nvmeof"
NVMEOF_TAGS="ceph,nvme-of,nvme-of gateway,rbd,block storage"
NVMEOF_WANTS="ceph,rbd"
NVMEOF_IP_ADDRESS="192.168.13.3"
NVMEOF_IP_ADDRESS=192.168.13.3
NVMEOF_IPV6_ADDRESS=2001:db8::3
NVMEOF_IO_PORT=4420
NVMEOF_GW_PORT=5500
NVMEOF_DISC_PORT=8009
Expand Down Expand Up @@ -57,10 +58,10 @@ CEPH_CLUSTER_VERSION="${CEPH_VERSION}"
CEPH_VSTART_ARGS="--without-dashboard --memstore"

# Demo settings
RBD_POOL="rbd"
RBD_IMAGE_NAME="demo_image"
RBD_IMAGE_SIZE="10M"
BDEV_NAME="demo_bdev"
RBD_POOL=rbd
RBD_IMAGE_NAME=demo_image
RBD_IMAGE_SIZE=10M
BDEV_NAME=demo_bdev
NQN="nqn.2016-06.io.spdk:cnode1"
SERIAL="SPDK00000000000001"

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build-container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ jobs:
shopt -s expand_aliases
eval $(make alias)
nvmeof-cli get_subsystems
nvmeof-cli-ipv6 get_subsystems
- name: Run bdevperf
run: |
Expand Down
6 changes: 4 additions & 2 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from .proto import gateway_pb2_grpc as pb2_grpc
from .proto import gateway_pb2 as pb2

from .config import GatewayConfig

def argument(*name_or_flags, **kwargs):
"""Helper function to format arguments for argparse command decorator."""
Expand Down Expand Up @@ -124,7 +124,9 @@ def stub(self):

def connect(self, host, port, client_key, client_cert, server_cert):
"""Connects to server and sets stub."""
server = "{}:{}".format(host, port)
# We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it
host = GatewayConfig.escape_address_if_ipv6(host)
server = f"{host}:{port}"

if client_key and client_cert:
# Create credentials for mutual TLS and a secure channel
Expand Down
7 changes: 7 additions & 0 deletions control/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,10 @@ def dump_config_file(self, logger):
self.conffile_logged = True
except Exception:
pass

# We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it
def escape_address_if_ipv6(addr) -> str:
ret_addr = addr
if ":" in addr and not addr.strip().startswith("["):
ret_addr = f"[{addr}]"
return ret_addr
6 changes: 4 additions & 2 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ class DiscoveryService:
discovery_port: Discovery controller's listening port
"""

DISCOVERY_NQN = "nqn.2014-08.org.nvmexpress.discovery"

def __init__(self, config):
self.version = 1
self.config = config
Expand Down Expand Up @@ -731,12 +733,12 @@ def reply_get_log_page(self, conn, data, cmd_id):
log_entry_counter = 0
while log_entry_counter < len(allow_listeners):
log_entry = DiscoveryLogEntry()
trtype = TRANSPORT_TYPES[allow_listeners[log_entry_counter]["trtype"]]
trtype = TRANSPORT_TYPES[allow_listeners[log_entry_counter]["trtype"].upper()]
if trtype is None:
self.logger.error("unsupported transport type")
else:
log_entry.trtype = trtype
adrfam = ADRFAM_TYPES[allow_listeners[log_entry_counter]["adrfam"]]
adrfam = ADRFAM_TYPES[allow_listeners[log_entry_counter]["adrfam"].lower()]
if adrfam is None:
self.logger.error("unsupported adress family")
else:
Expand Down
137 changes: 110 additions & 27 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from google.protobuf import json_format
from .proto import gateway_pb2 as pb2
from .proto import gateway_pb2_grpc as pb2_grpc
from .config import GatewayConfig
from .discovery import DiscoveryService
from .state import GatewayState

MAX_ANA_GROUPS = 4

Expand Down Expand Up @@ -165,17 +168,17 @@ def get_bdev_namespaces(self, bdev_name) -> list:
ns_list = []
local_state_dict = self.gateway_state.local.get_state()
for key, val in local_state_dict.items():
if key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
try:
req = json_format.Parse(val, pb2.add_namespace_req(), ignore_unknown_fields = True)
ns_bdev_name = req.bdev_name
if ns_bdev_name == bdev_name:
nsid = req.nsid
nqn = req.subsystem_nqn
ns_list.insert(0, {"nqn" : nqn, "nsid" : nsid})
except Exception as ex:
self.logger.error(f"Got exception trying to get bdev {bdev_name} namespaces: {ex}")
pass
if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
continue
try:
ns = json.loads(val)
if ns["bdev_name"] == bdev_name:
nsid = ns["nsid"]
nqn = ns["subsystem_nqn"]
ns_list.insert(0, {"nqn" : nqn, "nsid" : nsid})
except Exception as ex:
self.logger.error(f"Got exception trying to get bdev {bdev_name} namespaces: {ex}")
pass

return ns_list

Expand Down Expand Up @@ -243,18 +246,59 @@ def delete_bdev(self, request, context=None):
with self.rpc_lock:
return self.delete_bdev_safe(request, context)

def is_discovery_nqn(self, nqn) -> bool:
return nqn == DiscoveryService.DISCOVERY_NQN

def serial_number_already_used(self, context, serial) -> str:
if not context:
return None
state = self.gateway_state.local.get_state()
for key, val in state.items():
if not key.startswith(self.gateway_state.local.SUBSYSTEM_PREFIX):
continue
try:
subsys = json.loads(val)
sn = subsys["serial_number"]
if serial == sn:
return subsys["subsystem_nqn"]
except Exception:
self.logger.warning("Got exception while parsing {val}: {ex}")
continue
return None

def create_subsystem_safe(self, request, context=None):
"""Creates a subsystem."""

self.logger.info(
f"Received request to create subsystem {request.subsystem_nqn}, ana reporting: {request.ana_reporting} ")

if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't create a discovery subsystem")

min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1)
max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519)
if not request.serial_number:
random.seed()
randser = random.randint(2, 99999999999999)
request.serial_number = f"SPDK{randser}"
self.logger.info(f"No serial number specified, will use {request.serial_number}")

try:
subsys_using_serial = self.serial_number_already_used(context, request.serial_number)
if subsys_using_serial:
self.logger.error(f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}")
req = {"subsystem_nqn": request.subsystem_nqn,
"serial_number": request.serial_number,
"max_namespaces": request.max_namespaces,
"ana_reporting": request.ana_reporting,
"enable_ha": request.enable_ha,
"method": "nvmf_create_subsystem", "req_id": 0}
ret = {"code": -errno.EEXIST, "message": f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"}
msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2),
"Got JSON-RPC error response",
"response:",
json.dumps(ret, indent=2)])
raise Exception(msg)
ret = rpc_nvmf.nvmf_create_subsystem(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
Expand Down Expand Up @@ -295,6 +339,10 @@ def delete_subsystem_safe(self, request, context=None):

self.logger.info(
f"Received request to delete subsystem {request.subsystem_nqn}")

if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't delete a discovery subsystem")

try:
ret = rpc_nvmf.nvmf_delete_subsystem(
self.spdk_rpc_client,
Expand Down Expand Up @@ -325,11 +373,16 @@ def delete_subsystem(self, request, context=None):

def add_namespace_safe(self, request, context=None):
"""Adds a namespace to a subsystem."""
if request.anagrpid > MAX_ANA_GROUPS:
raise Exception(f"Error group ID {request.anagrpid} is more than configured maximum {MAX_ANA_GROUPS}\n")

self.logger.info(f"Received request to add {request.bdev_name} to"
f" {request.subsystem_nqn}")

if request.anagrpid > MAX_ANA_GROUPS:
raise Exception(f"Error group ID {request.anagrpid} is more than configured maximum {MAX_ANA_GROUPS}")

if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't add a namespace to a discovery subsystem")

try:
nsid = rpc_nvmf.nvmf_subsystem_add_ns(
self.spdk_rpc_client,
Expand Down Expand Up @@ -371,6 +424,10 @@ def remove_namespace_safe(self, request, context=None):

self.logger.info(f"Received request to remove nsid {request.nsid} from"
f" {request.subsystem_nqn}")

if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't remove a namespace from a discovery subsystem")

try:
ret = rpc_nvmf.nvmf_subsystem_remove_ns(
self.spdk_rpc_client,
Expand Down Expand Up @@ -404,7 +461,7 @@ def remove_namespace(self, request, context=None):
def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool:
if not context:
return False
host_key = "_".join([self.gateway_state.local.HOST_PREFIX + subsys_nqn, host_nqn])
host_key = GatewayState.build_host_key(subsys_nqn, host_nqn)
state = self.gateway_state.local.get_state()
if state.get(host_key):
return True
Expand All @@ -414,6 +471,12 @@ def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool:
def add_host_safe(self, request, context=None):
"""Adds a host to a subsystem."""

if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't allow a host to a discovery subsystem")

if self.is_discovery_nqn(request.host_nqn):
raise Exception(f"Can't use a discovery NQN as host NQN")

try:
host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn)
if host_already_exist:
Expand Down Expand Up @@ -479,6 +542,12 @@ def add_host(self, request, context=None):
def remove_host_safe(self, request, context=None):
"""Removes a host from a subsystem."""

if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't remove a host from a discovery subsystem")

if self.is_discovery_nqn(request.host_nqn):
raise Exception(f"Can't use a discovery NQN as host NQN")

try:
if request.host_nqn == "*": # Disable allow any host access
self.logger.info(
Expand Down Expand Up @@ -525,7 +594,7 @@ def remove_host(self, request, context=None):
def matching_listener_exists(self, context, nqn, gw_name, trtype, traddr, trsvcid) -> bool:
if not context:
return False
listener_key = "_".join([self.gateway_state.local.LISTENER_PREFIX + nqn, gw_name, trtype, traddr, trsvcid])
listener_key = GatewayState.build_listener_key(nqn, gw_name, trtype, traddr, trsvcid)
state = self.gateway_state.local.get_state()
if state.get(listener_key):
return True
Expand All @@ -535,9 +604,14 @@ def matching_listener_exists(self, context, nqn, gw_name, trtype, traddr, trsvci
def create_listener_safe(self, request, context=None):
"""Creates a listener for a subsystem at a given IP/Port."""
ret = True
traddr = GatewayConfig.escape_address_if_ipv6(request.traddr)
self.logger.info(f"Received request to create {request.gateway_name}"
f" {request.trtype} listener for {request.nqn} at"
f" {request.traddr}:{request.trsvcid}.")
f" {traddr}:{request.trsvcid}.")

if self.is_discovery_nqn(request.nqn):
raise Exception(f"Can't create a listener for a discovery subsystem")

try:
if request.gateway_name == self.gateway_name:
listener_already_exist = self.matching_listener_exists(
Expand Down Expand Up @@ -574,20 +648,24 @@ def create_listener_safe(self, request, context=None):
return pb2.req_status()

state = self.gateway_state.local.get_state()
req = None
subsys = state.get(self.gateway_state.local.SUBSYSTEM_PREFIX + request.nqn)
if subsys:
self.logger.debug(f"value of sub-system: {subsys}")
enable_ha = False
subsys_str = state.get(GatewayState.build_subsystem_key(request.nqn))
if subsys_str:
self.logger.debug(f"value of sub-system: {subsys_str}")
try:
req = json_format.Parse(subsys, pb2.create_subsystem_req())
self.logger.info(f"enable_ha: {req.enable_ha}")
except Exception:
self.logger.error(f"Got exception trying to parse subsystem: {ex}")
subsys_dict = json.loads(subsys_str)
try:
enable_ha = subsys_dict["enable_ha"]
except KeyError:
enable_ha = False
self.logger.info(f"enable_ha: {enable_ha}")
except Exception as ex:
self.logger.error(f"Got exception trying to parse subsystem {request.nqn}: {ex}")
pass
else:
self.logger.info(f"No sub-system for {request.nqn}")
self.logger.info(f"No subsystem for {request.nqn}")

if req and req.enable_ha:
if enable_ha:
for x in range (MAX_ANA_GROUPS):
try:
ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state(
Expand Down Expand Up @@ -627,9 +705,14 @@ def delete_listener_safe(self, request, context=None):
"""Deletes a listener from a subsystem at a given IP/Port."""

ret = True
traddr = GatewayConfig.escape_address_if_ipv6(request.traddr)
self.logger.info(f"Received request to delete {request.gateway_name}"
f" {request.trtype} listener for {request.nqn} at"
f" {request.traddr}:{request.trsvcid}.")
f" {traddr}:{request.trsvcid}.")

if self.is_discovery_nqn(request.nqn):
raise Exception(f"Can't delete a listener from a discovery subsystem")

try:
if request.gateway_name == self.gateway_name:
ret = rpc_nvmf.nvmf_subsystem_remove_listener(
Expand Down
5 changes: 4 additions & 1 deletion control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler
from .grpc import GatewayService
from .discovery import DiscoveryService
from .config import GatewayConfig

def sigchld_handler(signum, frame):
"""Handle SIGCHLD, runs when a spdk process terminates."""
Expand Down Expand Up @@ -137,7 +138,7 @@ def _start_discovery_service(self):
return

try:
rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery")
rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, DiscoveryService.DISCOVERY_NQN)
except Exception as ex:
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise
Expand All @@ -158,6 +159,8 @@ def _add_server_listener(self):
enable_auth = self.config.getboolean("gateway", "enable_auth")
gateway_addr = self.config.get("gateway", "addr")
gateway_port = self.config.get("gateway", "port")
# We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it
gateway_addr = GatewayConfig.escape_address_if_ipv6(gateway_addr)
if enable_auth:
# Read in key and certificates for authentication
server_key = self.config.get("mtls", "server_key")
Expand Down
Loading

0 comments on commit 5a2d1fd

Please sign in to comment.