From 53f94da56fab0c3ee635a6468111af444b51c6f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Thu, 16 May 2024 09:52:31 +0200 Subject: [PATCH] add queuelistener for logging (#590) * add multiprocess queuelistener for logging * add more tests for pipeline manager setup logging * Update CHANGELOG.md --------- Co-authored-by: dtrai2 <95028228+dtrai2@users.noreply.github.com> --- CHANGELOG.md | 2 + logprep/framework/pipeline_manager.py | 10 ++++ logprep/runner.py | 2 + logprep/util/configuration.py | 1 - logprep/util/defaults.py | 9 +++- logprep/util/logging.py | 20 +++++++- tests/unit/framework/test_pipeline_manager.py | 12 +++++ tests/unit/util/test_configuration.py | 5 +- tests/unit/util/test_logging.py | 49 +++++++++++++++++-- 9 files changed, 99 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 499d63aeb..520b94c49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ ### Improvements * remove logger from Components and Factory signatures +* add `LogprepMPQueueListener` to outsource logging to a separate process +* add a single `Queuehandler` to root logger to ensure all logs were handled by `LogprepMPQueueListener` ## 11.3.0 diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 30120b14d..c0c29294c 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -15,6 +15,7 @@ from logprep.metrics.exporter import PrometheusExporter from logprep.metrics.metrics import CounterMetric from logprep.util.configuration import Configuration +from logprep.util.logging import LogprepMPQueueListener, logqueue logger = logging.getLogger("Manager") @@ -52,8 +53,10 @@ class Metrics(Component.Metrics): def __init__(self, configuration: Configuration): self.metrics = self.Metrics(labels={"component": "manager"}) + self.loghandler = None if multiprocessing.current_process().name == "MainProcess": self._set_http_input_queue(configuration) + self._setup_logging() self._pipelines: list[multiprocessing.Process] = [] self._configuration = configuration @@ -64,6 +67,13 @@ def __init__(self, configuration: Configuration): else: self.prometheus_exporter = None + def _setup_logging(self): + console_logger = logging.getLogger("console") + if console_logger.handlers: + console_handler = console_logger.handlers.pop() # last handler is console + self.loghandler = LogprepMPQueueListener(logqueue, console_handler) + self.loghandler.start() + def _set_http_input_queue(self, configuration): """ this workaround has to be done because the queue size is not configurable diff --git a/logprep/runner.py b/logprep/runner.py index aa09dc42c..fd5e016dc 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -151,6 +151,8 @@ def start(self): self._logger.info("Shutting down") self._manager.stop() self._logger.info("Shutdown complete") + if self._manager.loghandler is not None: + self._manager.loghandler.stop() def _iterate(self): for _ in self._keep_iterating(): diff --git a/logprep/util/configuration.py b/logprep/util/configuration.py index 403eb17e4..6c859d861 100644 --- a/logprep/util/configuration.py +++ b/logprep/util/configuration.py @@ -204,7 +204,6 @@ import os from copy import deepcopy from itertools import chain -from logging import getLogger from logging.config import dictConfig from pathlib import Path from typing import Any, Iterable, List, Optional diff --git a/logprep/util/defaults.py b/logprep/util/defaults.py index 9b296eb1e..f01c6824b 100644 --- a/logprep/util/defaults.py +++ b/logprep/util/defaults.py @@ -20,10 +20,15 @@ "class": "logging.StreamHandler", "formatter": "logprep", "stream": "ext://sys.stdout", - } + }, + "queue": { + "class": "logging.handlers.QueueHandler", + "queue": "ext://logprep.util.logging.logqueue", + }, }, "loggers": { - "root": {"level": "INFO", "handlers": ["console"]}, + "root": {"level": "INFO", "handlers": ["queue"]}, + "console": {"handlers": ["console"]}, "filelock": {"level": "ERROR"}, "urllib3.connectionpool": {"level": "ERROR"}, "elasticsearch": {"level": "ERROR"}, diff --git a/logprep/util/logging.py b/logprep/util/logging.py index 2559c6109..aaa4d7ea8 100644 --- a/logprep/util/logging.py +++ b/logprep/util/logging.py @@ -1,9 +1,12 @@ """helper classes for logprep logging""" import logging -import logging.handlers +import multiprocessing as mp +from logging.handlers import QueueListener from socket import gethostname +logqueue = mp.Queue(-1) + class LogprepFormatter(logging.Formatter): """ @@ -31,3 +34,18 @@ class LogprepFormatter(logging.Formatter): def format(self, record): record.hostname = gethostname() return super().format(record) + + +class LogprepMPQueueListener(QueueListener): + """Logprep specific QueueListener that uses a multiprocessing instead of threading""" + + _process: mp.Process + + def start(self): + self._process = mp.Process(target=self._monitor, daemon=True) + self._process.start() + + def stop(self): + self.enqueue_sentinel() + self._process.join() + self._process = None diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 4ac63da07..c009ce1f8 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -3,6 +3,7 @@ # pylint: disable=attribute-defined-outside-init from copy import deepcopy from logging import Logger +from logging.config import dictConfig from unittest import mock from logprep.connector.http.input import HttpConnector @@ -10,6 +11,8 @@ from logprep.framework.pipeline_manager import PipelineManager from logprep.metrics.exporter import PrometheusExporter from logprep.util.configuration import Configuration, MetricsConfig +from logprep.util.defaults import DEFAULT_LOG_CONFIG +from logprep.util.logging import logqueue from tests.testdata.metadata import path_to_config @@ -208,3 +211,12 @@ def test_pipeline_manager_sets_queue_size_for_http_input(self): assert HttpConnector.messages._maxsize == 100 http_input = Factory.create(config.input) assert http_input.messages._maxsize == 100 + + def test_pipeline_manager_setups_logging(self): + dictConfig(DEFAULT_LOG_CONFIG) + manager = PipelineManager(self.config) + assert manager.loghandler is not None + assert manager.loghandler.queue == logqueue + assert manager.loghandler._thread is None + assert manager.loghandler._process.is_alive() + assert manager.loghandler._process.daemon diff --git a/tests/unit/util/test_configuration.py b/tests/unit/util/test_configuration.py index 5a1a59e57..90680a2c2 100644 --- a/tests/unit/util/test_configuration.py +++ b/tests/unit/util/test_configuration.py @@ -1288,7 +1288,8 @@ def test_logger_config_sets_global_level(self, kwargs): @pytest.mark.parametrize("kwargs", [{"loggers": {"logprep": {"level": "DEBUG"}}}]) def test_loggers_config_only_sets_level(self, kwargs): config = LoggerConfig(**kwargs) - assert config.loggers.get("logprep").get("level") == "DEBUG", "should be set" assert config.loggers.get("root").get("level") == "INFO", "should be default" - assert config.loggers.get("root").get("handlers") == ["console"], "should be default" + assert config.loggers.get("root").get("handlers") == [ + "queue", + ], "should be default" assert config.loggers.get("opensearch").get("level") == "ERROR", "should be default" diff --git a/tests/unit/util/test_logging.py b/tests/unit/util/test_logging.py index 3da497adc..fc229052b 100644 --- a/tests/unit/util/test_logging.py +++ b/tests/unit/util/test_logging.py @@ -1,19 +1,58 @@ +# pylint: disable=missing-docstring import logging import logging.config +import logging.handlers +import multiprocessing as mp +import queue from socket import gethostname import pytest +from _pytest.logging import LogCaptureHandler from logprep.util.defaults import DEFAULT_LOG_CONFIG from logprep.util.logging import LogprepFormatter -class TestLogprepFormatter: +def setup_module(): + logging.config.dictConfig(DEFAULT_LOG_CONFIG) + + +class TestLogDictConfig: + """this tests the logprep.util.defaults.DEFAULT_LOG_CONFIG dict""" + + def test_console_logger_uses_logprep_formatter(self): + logger = logging.getLogger("console") + assert isinstance(logger.handlers[0].formatter, LogprepFormatter) - def test_default_log_config(self): - logging.config.dictConfig(DEFAULT_LOG_CONFIG) - logger = logging.getLogger("test") - assert isinstance(logger.root.handlers[0].formatter, LogprepFormatter) + def test_root_logger_has_only_quehandler(self): + logger = logging.getLogger("root") + assert len(logger.handlers) == 3, "queuehandler and 2 logcapture handlers from pytest" + assert isinstance(logger.handlers[0], logging.handlers.QueueHandler) + # these handlers are pytest handlers and won't be available in production + assert isinstance(logger.handlers[1], LogCaptureHandler) + assert isinstance(logger.handlers[2], LogCaptureHandler) + + def test_queuhandler_uses_multiprocessing_queue(self): + logger = logging.getLogger("root") + assert isinstance(logger.handlers[0].queue, mp.queues.Queue) + assert not isinstance(logger.handlers[0].queue, queue.Queue) + + @pytest.mark.parametrize( + ("logger_name", "expected_level"), + [ + ("filelock", "ERROR"), + ("urllib3.connectionpool", "ERROR"), + ("elasticsearch", "ERROR"), + ("opensearch", "ERROR"), + ], + ) + def test_default_log_levels(self, logger_name, expected_level): + loglevel = logging.getLogger(logger_name).level + loglevel = logging.getLevelName(loglevel) + assert loglevel == expected_level + + +class TestLogprepFormatter: def test_formatter_init_with_default(self): default_formatter_config = DEFAULT_LOG_CONFIG["formatters"]["logprep"]