Skip to content

Commit

Permalink
Extract OmapObject, refactor state, remove localstate
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Sep 10, 2023
1 parent d28c111 commit bf23372
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 373 deletions.
26 changes: 15 additions & 11 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

import spdk.rpc.bdev as rpc_bdev
import spdk.rpc.nvmf as rpc_nvmf
import spdk.rpc.client as rpc_client

from google.protobuf import json_format
from .proto import gateway_pb2 as pb2
from .proto import gateway_pb2_grpc as pb2_grpc
from .state import GatewayStateHandler
from .config import GatewayConfig


class GatewayService(pb2_grpc.GatewayServicer):
Expand All @@ -35,7 +38,8 @@ class GatewayService(pb2_grpc.GatewayServicer):
spdk_rpc_client: Client of SPDK RPC server
"""

def __init__(self, config, gateway_state, spdk_rpc_client):
def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler,
spdk_rpc_client: rpc_client.JSONRPCClient):

self.logger = logging.getLogger(__name__)
self.config = config
Expand Down Expand Up @@ -82,7 +86,7 @@ def create_bdev(self, request, context=None):
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True)
self.gateway_state.add_bdev(bdev_name, json_req)
self.gateway_state.omap.add_bdev(bdev_name, json_req)
except Exception as ex:
self.logger.error(
f"Error persisting create_bdev {bdev_name}: {ex}")
Expand Down Expand Up @@ -141,7 +145,7 @@ def delete_bdev(self, request, context=None):
if context:
# Update gateway state
try:
self.gateway_state.remove_bdev(request.bdev_name)
self.gateway_state.omap.remove_bdev(request.bdev_name)
except Exception as ex:
self.logger.error(
f"Error persisting delete_bdev {request.bdev_name}: {ex}")
Expand Down Expand Up @@ -182,7 +186,7 @@ def create_subsystem(self, request, context=None):
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True)
self.gateway_state.add_subsystem(request.subsystem_nqn,
self.gateway_state.omap.add_subsystem(request.subsystem_nqn,
json_req)
except Exception as ex:
self.logger.error(f"Error persisting create_subsystem"
Expand Down Expand Up @@ -212,7 +216,7 @@ def delete_subsystem(self, request, context=None):
if context:
# Update gateway state
try:
self.gateway_state.remove_subsystem(request.subsystem_nqn)
self.gateway_state.omap.remove_subsystem(request.subsystem_nqn)
except Exception as ex:
self.logger.error(f"Error persisting delete_subsystem"
f" {request.subsystem_nqn}: {ex}")
Expand Down Expand Up @@ -247,7 +251,7 @@ def add_namespace(self, request, context=None):
request.nsid = nsid
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True)
self.gateway_state.add_namespace(request.subsystem_nqn,
self.gateway_state.omap.add_namespace(request.subsystem_nqn,
str(nsid), json_req)
except Exception as ex:
self.logger.error(
Expand Down Expand Up @@ -278,7 +282,7 @@ def remove_namespace(self, request, context=None):
if context:
# Update gateway state
try:
self.gateway_state.remove_namespace(request.subsystem_nqn,
self.gateway_state.omap.remove_namespace(request.subsystem_nqn,
str(request.nsid))
except Exception as ex:
self.logger.error(
Expand Down Expand Up @@ -322,7 +326,7 @@ def add_host(self, request, context=None):
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True)
self.gateway_state.add_host(request.subsystem_nqn,
self.gateway_state.omap.add_host(request.subsystem_nqn,
request.host_nqn, json_req)
except Exception as ex:
self.logger.error(
Expand Down Expand Up @@ -365,7 +369,7 @@ def remove_host(self, request, context=None):
if context:
# Update gateway state
try:
self.gateway_state.remove_host(request.subsystem_nqn,
self.gateway_state.omap.remove_host(request.subsystem_nqn,
request.host_nqn)
except Exception as ex:
self.logger.error(f"Error persisting remove_host: {ex}")
Expand Down Expand Up @@ -406,7 +410,7 @@ def create_listener(self, request, context=None):
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True)
self.gateway_state.add_listener(request.nqn,
self.gateway_state.omap.add_listener(request.nqn,
request.gateway_name,
request.trtype, request.traddr,
request.trsvcid, json_req)
Expand Down Expand Up @@ -448,7 +452,7 @@ def delete_listener(self, request, context=None):
if context:
# Update gateway state
try:
self.gateway_state.remove_listener(request.nqn,
self.gateway_state.omap.remove_listener(request.nqn,
request.gateway_name,
request.trtype,
request.traddr,
Expand Down
134 changes: 134 additions & 0 deletions control/omap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#
# Copyright (c) 2021 International Business Machines
# All rights reserved.
#
# SPDX-License-Identifier: LGPL-3.0-or-later
#
# Authors: [email protected], [email protected]
#
import logging
import rados
from typing import Dict
from collections import defaultdict


class OmapObject:
"""Class representing versioned omap object"""
OMAP_VERSION_KEY = "omap_version"

def __init__(self, name, ioctx) -> None:
self.version = 1
self.watch = None
self.cached_object = defaultdict(dict)
self.name = name
self.logger = logging.getLogger(__name__)
self.ioctx = ioctx
self.create()

def create(self) -> None:
"""Create OMAP object if does not exist already"""
try:
# Create a new persistence OMAP object
with rados.WriteOpCtx() as write_op:
# Set exclusive parameter to fail write_op if object exists
write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE)
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(self.version),))
self.ioctx.operate_write_op(write_op, self.name)
self.logger.info(
f"First gateway: created object {self.name}")
except rados.ObjectExists:
self.logger.info(f"{self.name} omap object already exists.")
except Exception:
self.logger.exception(f"Unable to create omap {self.name}:")
raise

def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context destructor"""
if self.watch is not None:
self.watch.close()
self.ioctx.close()

def get(self) -> Dict[str, str]:
"""Returns dict of all OMAP keys and values."""
with rados.ReadOpCtx() as read_op:
i, _ = self.ioctx.get_omap_vals(read_op, "", "", -1)
self.ioctx.operate_read_op(read_op, self.name)
omap_dict = dict(i)
return omap_dict

def _notify(self) -> None:
""" Notify other gateways within the group of change """
try:
self.ioctx.notify(self.name)
except Exception as ex:
self.logger.info(f"Failed to notify.")

def add_key(self, key: str, val: str) -> None:
"""Adds key and value to the OMAP."""
try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause write failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.set_omap(write_op, (key,), (val,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.name)
self.version = version_update
self.logger.debug(f"omap_key generated: {key}")
except Exception as ex:
self.logger.error(f"Unable to add key to omap: {ex}. Exiting!")
raise

self._notify()

def remove_key(self, key: str) -> None:
"""Removes key from the OMAP."""
try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
# Compare operation failure will cause remove failure
write_op.omap_cmp(self.OMAP_VERSION_KEY, str(self.version),
rados.LIBRADOS_CMPXATTR_OP_EQ)
self.ioctx.remove_omap_keys(write_op, (key,))
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(version_update),))
self.ioctx.operate_write_op(write_op, self.name)
self.version = version_update
self.logger.debug(f"omap_key removed: {key}")
except Exception:
self.logger.exception(f"Unable to remove key from omap:")
raise

self._notify()

def delete(self) -> None:
"""Deletes OMAP object contents."""
try:
with rados.WriteOpCtx() as write_op:
self.ioctx.clear_omap(write_op)
self.ioctx.operate_write_op(write_op, self.name)
self.ioctx.set_omap(write_op, (self.OMAP_VERSION_KEY,),
(str(1),))
self.ioctx.operate_write_op(write_op, self.name)
self.logger.info(f"Deleted OMAP {self.name} contents.")
except Exception:
self.logger.exception(f"Error deleting OMAP {self.name} contents:")
raise

def register_watch(self, notify_event) -> None:
"""Sets a watch on the OMAP object for changes."""

def _watcher_callback(notify_id, notifier_id, watch_id, data):
notify_event.set()

if self.watch is None:
try:
self.watch = self.ioctx.watch(self.name, _watcher_callback)
except Exception:
self.logger.exception(f"Unable to initiate watch {self.name}:")
raise
else:
self.logger.info(f"Watch {self.name} already exists.")
16 changes: 8 additions & 8 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import json
import logging
import signal
import traceback
from concurrent import futures
from typing import Dict
from google.protobuf import json_format

import spdk.rpc
Expand All @@ -26,7 +26,7 @@

from .proto import gateway_pb2 as pb2
from .proto import gateway_pb2_grpc as pb2_grpc
from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler
from .state import GatewayState, OmapGatewayState, GatewayStateHandler
from .grpc import GatewayService

def sigchld_handler(signum, frame):
Expand Down Expand Up @@ -95,17 +95,17 @@ def serve(self):
# Start SPDK
self._start_spdk()

# Register service implementation with server
# Init OMAP state
omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()
gateway_state = GatewayStateHandler(self.config, local_state,
omap_state, self.gateway_rpc_caller)
gateway_state = GatewayStateHandler(self.config, omap_state, self.gateway_rpc_caller)

# Register GRPC service implementation with server
self.gateway_rpc = GatewayService(self.config, gateway_state,
self.spdk_rpc_client)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)

# Add listener port
# Add GRPC listener port
self._add_server_listener()

# Check for existing NVMeoF target state
Expand Down Expand Up @@ -288,7 +288,7 @@ def _ping(self):
self.logger.error(f"spdk_get_version failed with: \n {ex}")
return False

def gateway_rpc_caller(self, requests, is_add_req):
def gateway_rpc_caller(self, requests: Dict[str, str] , is_add_req: bool) -> None:
"""Passes RPC requests to gateway service."""
for key, val in requests.items():
if key.startswith(GatewayState.BDEV_PREFIX):
Expand Down
Loading

0 comments on commit bf23372

Please sign in to comment.