Skip to content

Commit

Permalink
Reland "Piggyback-Hub: use processes instead of threads""
Browse files Browse the repository at this point in the history
This relands commit 600cbb276726878a4dd86c28f3af26645291927f.

Change-Id: I84a5998066c657ecdb2fd182b4552d7988688321
  • Loading branch information
mo-ki committed Sep 30, 2024
1 parent d04efec commit 135581b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 71 deletions.
78 changes: 36 additions & 42 deletions cmk/piggyback_hub/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
import sys
import time
from dataclasses import dataclass
from itertools import cycle
from logging import getLogger
from logging.handlers import WatchedFileHandler
from pathlib import Path
from types import FrameType

from cmk.ccc.daemon import daemonize, pid_file_lock

from cmk.piggyback_hub.config import PiggybackHubConfig, save_config_on_message
from cmk.piggyback_hub.payload import (
PiggybackPayload,
save_payload_on_message,
SendingPayloadThread,
SendingPayloadProcess,
)
from cmk.piggyback_hub.utils import ReceivingThread, SignalException

from .utils import APP_NAME, ReceivingProcess

VERBOSITY_MAP = {
0: logging.INFO,
Expand Down Expand Up @@ -80,41 +81,47 @@ def _setup_logging(args: Arguments) -> logging.Logger:
if args.foreground
else WatchedFileHandler(Path(args.log_file))
)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelno)s] [%(name)s] %(message)s"))
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s"))
logger.addHandler(handler)

logger.setLevel(VERBOSITY_MAP.get(args.verbosity, logging.INFO))

return logger


def signal_handler(_signum: int, _stack_frame: FrameType | None) -> None:
raise SignalException()


def _register_signal_handler() -> None:
signal.signal(signal.SIGTERM, signal_handler)


def run_piggyback_hub(logger: logging.Logger, omd_root: Path) -> None:
def run_piggyback_hub(logger: logging.Logger, omd_root: Path) -> int:
# TODO: remove this loop when rabbitmq available in site
for _ in range(1_000_000):
time.sleep(1_000)

receiving_thread = ReceivingThread(
logger, omd_root, PiggybackPayload, save_payload_on_message(logger, omd_root), "payload"
)
sending_thread = SendingPayloadThread(logger, omd_root)
receive_config_thread = ReceivingThread(
logger, omd_root, PiggybackHubConfig, save_config_on_message(logger, omd_root), "config"
processes = (
ReceivingProcess(
logger, omd_root, PiggybackPayload, save_payload_on_message(logger, omd_root), "payload"
),
SendingPayloadProcess(logger, omd_root),
ReceivingProcess(
logger, omd_root, PiggybackHubConfig, save_config_on_message(logger, omd_root), "config"
),
)

receiving_thread.start()
sending_thread.start()
receive_config_thread.start()
receiving_thread.join()
sending_thread.join()
receive_config_thread.join()
for p in processes:
p.start()

def terminate_all_processes() -> int:
logger.info("Stopping: %s", APP_NAME)
for p in processes:
p.terminate()
return 0

signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(terminate_all_processes()))

# All processes should run forever. Die if either finishes.
for proc in cycle(processes):
proc.join(timeout=5)
if not proc.is_alive():
return terminate_all_processes()

raise RuntimeError("Unreachable code reached")


def main(argv: list[str] | None = None) -> int:
Expand All @@ -124,30 +131,17 @@ def main(argv: list[str] | None = None) -> int:
args = _parse_arguments(argv)
logger = _setup_logging(args)

logger.info("Starting Piggyback Hub daemon.")

try:
_register_signal_handler()
except Exception as e:
if args.debug:
raise
logger.exception("Unhandled exception: %s.", e)
return 1
logger.info("Starting: %s", APP_NAME)

if not args.foreground:
daemonize()
logger.info("Daemonized with PID %d.", os.getpid())

try:
with pid_file_lock(Path(args.pid_file)):
run_piggyback_hub(logger, Path(args.omd_root))
except SignalException:
logger.info("Stopping Piggyback Hub daemon.")
except Exception as e:
return run_piggyback_hub(logger, Path(args.omd_root))
except Exception as exc:
if args.debug:
raise
logger.exception("Unhandled exception: %s.", e)
logger.exception("Exception: %s: %s", APP_NAME, exc)
return 1

logger.info("Shutting down.")
return 0
37 changes: 22 additions & 15 deletions cmk/piggyback_hub/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
# conditions defined in the file COPYING, which is part of this source code package.

import logging
import threading
import multiprocessing
import signal
import time
from collections.abc import Sequence
from pathlib import Path
Expand All @@ -23,9 +24,10 @@
store_last_distribution_time,
store_piggyback_raw_data,
)
from cmk.piggyback_hub.config import load_config, PiggybackHubConfig, Target
from cmk.piggyback_hub.paths import create_paths
from cmk.piggyback_hub.utils import SignalException

from .config import load_config, PiggybackHubConfig, Target
from .paths import create_paths
from .utils import APP_NAME, make_log_and_exit

SENDING_PAUSE = 60 # [s]

Expand Down Expand Up @@ -110,16 +112,22 @@ def _send_message(
)


class SendingPayloadThread(threading.Thread):
class SendingPayloadProcess(multiprocessing.Process):
def __init__(self, logger: logging.Logger, omd_root: Path):
super().__init__()
self.logger = logger
self.omd_root = omd_root
self.config_path = create_paths(omd_root).config
self.task_name = "publishing on queue 'payload'"

def run(self):
self.logger.info("Starting: %s", self.task_name)
signal.signal(
signal.SIGTERM,
make_log_and_exit(self.logger.debug, f"Stopping: {self.task_name}"),
)
try:
with Connection("piggyback-hub", self.omd_root) as conn:
with Connection(APP_NAME, self.omd_root) as conn:
channel = conn.channel(PiggybackPayload)

while True:
Expand All @@ -130,20 +138,19 @@ def run(self):
target.host_name, self.omd_root
):
self.logger.debug(
"Sending payload for piggybacked host '%s' from source host '%s' to site '%s'",
piggyback_message.meta.piggybacked,
"%s: from host '%s' to host '%s' on site '%s'",
self.task_name.title(),
piggyback_message.meta.source,
piggyback_message.meta.piggybacked,
target.site_id,
)
_send_message(
channel, piggyback_message, target.site_id, self.omd_root, "payload"
)

time.sleep(SENDING_PAUSE)
except CMKConnectionError:
self.logger.error("RabbitMQ is not running. Stopping thread.")
except SignalException:
self.logger.debug("Stopping distributing messages")
return
except Exception as e:
self.logger.exception("Unhandled exception: %s.", e)
except CMKConnectionError as exc:
self.logger.error("Stopping: %s: %s", self.task_name, exc)
except Exception as exc:
self.logger.exception("Exception: %s: %s", self.task_name, exc)
raise
38 changes: 25 additions & 13 deletions cmk/piggyback_hub/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
# conditions defined in the file COPYING, which is part of this source code package.

import logging
import threading
import multiprocessing
import signal
import sys
from collections.abc import Mapping
from pathlib import Path
from typing import Callable, Generic, TypeVar
Expand All @@ -15,14 +17,20 @@

from .config import PiggybackHubConfig

APP_NAME = "piggyback-hub"

_ModelT = TypeVar("_ModelT", bound=BaseModel)


class SignalException(Exception):
pass
def make_log_and_exit(log: Callable[[str], None], message: str) -> Callable[[object, object], None]:
def log_and_exit(signum: object, frame: object) -> None:
log(message)
sys.exit(0)

return log_and_exit


class ReceivingThread(threading.Thread, Generic[_ModelT]):
class ReceivingProcess(multiprocessing.Process, Generic[_ModelT]):
def __init__(
self,
logger: logging.Logger,
Expand All @@ -37,23 +45,27 @@ def __init__(
self.model = model
self.callback = callback
self.queue = queue
self.task_name = f"receiving on queue '{self.queue}'"

def run(self) -> None:
self.logger.info("Starting: %s", self.task_name)
signal.signal(
signal.SIGTERM,
make_log_and_exit(self.logger.debug, f"Stopping: {self.task_name}"),
)
try:
with Connection("piggyback-hub", self.omd_root) as conn:
with Connection(APP_NAME, self.omd_root) as conn:
channel: Channel[_ModelT] = conn.channel(self.model)
channel.queue_declare(queue=self.queue, bindings=(self.queue,))

self.logger.debug("Waiting for messages in queue %s", self.queue)
self.logger.debug("Consuming: %s", self.task_name)
channel.consume(self.callback, queue=self.queue)

except CMKConnectionError:
self.logger.error("RabbitMQ is not running. Stopping thread.")
except SignalException:
self.logger.debug("Stopping receiving messages")
return
except Exception as e:
self.logger.exception("Unhandled exception: %s.", e)
except CMKConnectionError as exc:
self.logger.error("Stopping: %s: %s", self.task_name, exc)
except Exception as exc:
self.logger.exception("Exception: %s: %s", self.task_name, exc)
raise


def distribute(configs: Mapping[str, PiggybackHubConfig], omd_root: Path) -> None:
Expand Down
2 changes: 1 addition & 1 deletion packages/cmk-messaging/cmk/messaging/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ._constants import APP_PREFIX, INTERSITE_EXCHANGE, LOCAL_EXCHANGE


class CMKConnectionError(Exception):
class CMKConnectionError(RuntimeError):
pass


Expand Down

0 comments on commit 135581b

Please sign in to comment.