Skip to content

Commit

Permalink
subprocess with sigint for greaceful shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Oct 7, 2023
1 parent 0849aec commit 52cbba9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
64 changes: 31 additions & 33 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import json
import logging
import signal
import threading
from concurrent import futures
from google.protobuf import json_format

Expand Down Expand Up @@ -44,7 +43,7 @@ def sigchld_handler(signum, frame):
exit_code = os.waitstatus_to_exitcode(wait_status)

# GW process should exit now
raise SystemExit(f"spdk subprocess terminated {pid=} {exit_code=}")
raise SystemExit(f"Gateway subprocess terminated {pid=} {exit_code=}")

class GatewayServer:
"""Runs SPDK and receives client requests for the gateway service.
Expand All @@ -66,7 +65,7 @@ def __init__(self, config):
self.spdk_process = None
self.gateway_rpc = None
self.server = None
self.discovery_thread = None
self.discovery_pid = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand All @@ -82,34 +81,32 @@ def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
self.logger.exception("GatewayServer exception occurred:")

signal.signal(signal.SIGCHLD, signal.SIG_IGN)
if self.spdk_process is not None:
self._stop_spdk()

if self.server is not None:
self.logger.info("Stopping the server...")
self.server.stop(None)

if self.discovery_thread:
self.logger.info("Terminating discovery service...")
# discovery service selector loop should exit due to KeyboardInterrupt exception
try:
os.kill(os.getpid(), signal.SIGINT)
self.discovery_thread.join()
except KeyboardInterrupt:
self.logger.info("Ignore KeyboardInterrupt in the main thread")

self.discovery_thread = None
self.logger.info("Discovery service terminated")
if self.discovery_pid:
self._stop_discovery()

self.logger.info("Exiting the gateway process.")

def serve(self):
"""Starts gateway server."""
self.logger.debug("Starting serve")

# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# Start SPDK
self._start_spdk()

# Start discovery service
self._start_discovery_service()

# Register service implementation with server
omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()
Expand All @@ -129,8 +126,6 @@ def serve(self):
# Start server
self.server.start()

# Start discovery service
self._start_discovery_service()

def _start_discovery_service(self):
"""Runs either SPDK on CEPH NVMEOF Discovery Service."""
Expand All @@ -145,19 +140,13 @@ def _start_discovery_service(self):
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise

def discovery_thread():
while True:
try:
DiscoveryService(self.config).start_service()
except:
self.logger.exception("Discovery service exception:")
self.logger.error("Discovery service restarting")

# Start discovery service thread
self.logger.info("Starting ceph nvmeof discovery service thread")
assert self.discovery_thread is None
self.discovery_thread = threading.Thread(target=discovery_thread)
self.discovery_thread.start()
# run ceph nvmeof discovery service in sub-process
assert self.discovery_pid is None
self.discovery_pid = os.fork()
if self.discovery_pid == 0:
self.logger.info("Starting ceph nvmeof discovery service")
DiscoveryService(self.config).start_service()
os._exit(0)

def _add_server_listener(self):
"""Adds listener port to server."""
Expand Down Expand Up @@ -208,9 +197,6 @@ def _start_spdk(self):
cmd += shlex.split(spdk_tgt_cmd_extra_args)
self.logger.info(f"Starting {' '.join(cmd)}")
try:
# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# start spdk process
self.spdk_process = subprocess.Popen(cmd)
except Exception as ex:
Expand Down Expand Up @@ -259,7 +245,6 @@ def _stop_spdk(self):
rpc_socket = self.config.get("spdk", "rpc_socket")

# Terminate spdk process
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
if return_code is not None:
self.logger.error(f"SPDK({self.name}) pid {self.spdk_process.pid} "
f"already terminated, exit code: {return_code}")
Expand All @@ -283,6 +268,19 @@ def _stop_spdk(self):
self.logger.exception(f"An error occurred while removing "
f"rpc socket {rpc_socket}:")

def _stop_discovery(self):
"""Stops Discovery service process."""
assert self.discovery_pid is not None # should be verified by the caller

self.logger.info("Terminating discovery service...")
# discovery service selector loop should exit due to KeyboardInterrupt exception
os.kill(self.discovery_pid, signal.SIGINT)
_, exit_status = os.waitpid(self.discovery_pid, 0)
self.logger.info(f"Discovery service pid {self.discovery_pid} terminated, "
f"exit code={os.waitstatus_to_exitcode(exit_status)}")

self.discovery_pid = None

def _create_transport(self, trtype):
"""Initializes a transport type."""
args = {'trtype': trtype}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def _config(self, config):
self.config = config

def validate_exception(self, e):
pattern = r'spdk subprocess terminated pid=(\d+) exit_code=(\d+)'
pattern = r'Gateway subprocess terminated pid=(\d+) exit_code=(\d+)'
m = re.match(pattern, e.code)
assert(m)
pid = int(m.group(1))
Expand Down

0 comments on commit 52cbba9

Please sign in to comment.