From 566bd446973c9cd289abc65bdfa9628690fd4249 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 05:52:26 +0000 Subject: [PATCH 1/8] WIP --- logprep/framework/pipeline.py | 15 +- logprep/framework/pipeline_manager.py | 26 +- logprep/metrics/metric_exposer.py | 41 +- logprep/metrics/metric_targets.py | 146 ----- logprep/runner.py | 6 +- logprep/util/prometheus_exporter.py | 7 +- quickstart/exampledata/config/pipeline.yml | 4 +- tests/unit/framework/test_pipeline.py | 4 +- tests/unit/framework/test_pipeline_manager.py | 33 +- tests/unit/metrics/test_metric_exposer.py | 22 +- tests/unit/metrics/test_metric_targets.py | 503 ------------------ 11 files changed, 79 insertions(+), 728 deletions(-) delete mode 100644 logprep/metrics/metric_targets.py delete mode 100644 tests/unit/metrics/test_metric_targets.py diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 3c197c74f..807270a62 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -25,11 +25,11 @@ from logprep.abc.connector import Connector from logprep.abc.input import ( CriticalInputError, + CriticalInputParsingError, FatalInputError, Input, SourceDisconnectedError, WarningInputError, - CriticalInputParsingError, ) from logprep.abc.output import ( CriticalOutputError, @@ -44,6 +44,7 @@ from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler from logprep.util.pipeline_profiler import PipelineProfiler +from logprep.util.prometheus_exporter import PrometheusStatsExporter from logprep.util.time_measurement import TimeMeasurement @@ -223,7 +224,7 @@ def __init__( lock: Lock = None, shared_dict: dict = None, used_server_ports: dict = None, - metric_targets: MetricTargets = None, + prometheus_exporter: PrometheusStatsExporter = None, ) -> None: if log_handler and not isinstance(log_handler, Handler): raise MustProvideALogHandlerError @@ -239,7 +240,7 @@ def __init__( print_processed_period = self._logprep_config.get("print_processed_period", 300) self._processing_counter.setup(print_processed_period, log_handler, lock) self._used_server_ports = used_server_ports - self._metric_targets = metric_targets + self._prometheus_exporter = prometheus_exporter self.pipeline_index = pipeline_index self._encoder = msgspec.msgpack.Encoder() self._decoder = msgspec.msgpack.Decoder() @@ -263,7 +264,7 @@ def _metric_labels(self) -> dict: def _metrics_exposer(self) -> MetricExposer: return MetricExposer( self._logprep_config.get("metrics", {}), - self._metric_targets, + self._prometheus_exporter, self._shared_dict, self._lock, self.logger, @@ -272,7 +273,7 @@ def _metrics_exposer(self) -> MetricExposer: @cached_property def metrics(self) -> PipelineMetrics: """The pipeline metrics object""" - if self._metric_targets is None: + if self._prometheus_exporter is None: return None return self.PipelineMetrics( input=self._input.metrics, @@ -506,7 +507,7 @@ def __init__( lock: Lock, shared_dict: dict, used_server_ports: dict, - metric_targets: MetricTargets = None, + prometheus_exporter: PrometheusStatsExporter = None, ) -> None: if not isinstance(log_handler, MultiprocessingLogHandler): raise MustProvideAnMPLogHandlerError @@ -522,7 +523,7 @@ def __init__( lock=lock, shared_dict=shared_dict, used_server_ports=used_server_ports, - metric_targets=metric_targets, + prometheus_exporter=prometheus_exporter, ) self._continue_iterating = Value(c_bool) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index d3ef4ad68..8e2fde4ce 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -1,13 +1,13 @@ """This module contains functionality to manage pipelines via multi-processing.""" -from logging import Logger, DEBUG -from multiprocessing import Manager, Lock +from logging import DEBUG, Logger +from multiprocessing import Lock, Manager from queue import Empty from logprep.framework.pipeline import MultiprocessingPipeline -from logprep.metrics.metric import MetricTargets from logprep.util.configuration import Configuration from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler +from logprep.util.prometheus_exporter import PrometheusStatsExporter class PipelineManagerError(BaseException): @@ -24,10 +24,9 @@ def __init__(self, what_failed: str): class PipelineManager: """Manage pipelines via multi-processing.""" - def __init__(self, logger: Logger, metric_targets: MetricTargets): + def __init__(self, logger: Logger): self._logger = logger - self.metric_targets = metric_targets - + self._prometheus_exporter = None self._log_handler = MultiprocessingLogHandler(self._logger.level) self._pipelines = [] @@ -44,8 +43,11 @@ def set_configuration(self, configuration: Configuration): manager = Manager() self._shared_dict = manager.dict() self._used_server_ports = manager.dict() - for idx in range(configuration["process_count"]): + for idx in range(configuration.get("process_count", 1)): self._shared_dict[idx] = None + prometheus_config = configuration.get("metrics", {}) + if prometheus_config.get("enabled", False): + self._prometheus_exporter = PrometheusStatsExporter(prometheus_config, self._logger) def get_count(self) -> int: """Get the pipeline count. @@ -91,11 +93,9 @@ def restart_failed_pipeline(self): failed_pipelines = [pipeline for pipeline in self._pipelines if not pipeline.is_alive()] for failed_pipeline in failed_pipelines: self._pipelines.remove(failed_pipeline) - - if self.metric_targets and self.metric_targets.prometheus_target: - self.metric_targets.prometheus_target.prometheus_exporter.remove_metrics_from_process( - failed_pipeline.pid - ) + if self._prometheus_exporter is None: + continue + self._prometheus_exporter.remove_metrics_from_process(failed_pipeline.pid) if failed_pipelines: self.set_count(self._configuration.get("process_count")) @@ -126,5 +126,5 @@ def _create_pipeline(self, index) -> MultiprocessingPipeline: lock=self._lock, shared_dict=self._shared_dict, used_server_ports=self._used_server_ports, - metric_targets=self.metric_targets, + prometheus_exporter=self._prometheus_exporter, ) diff --git a/logprep/metrics/metric_exposer.py b/logprep/metrics/metric_exposer.py index 280460806..3b5c5447a 100644 --- a/logprep/metrics/metric_exposer.py +++ b/logprep/metrics/metric_exposer.py @@ -5,13 +5,21 @@ import numpy as np -from logprep.metrics.metric_targets import split_key_label_string + +def split_key_label_string(key_label_string): + """Splits the key label string into separate variables""" + if ";" not in key_label_string: + return key_label_string, {} + key, labels = key_label_string.split(";") + labels = labels.split(",") + labels = [label.split(":") for label in labels] + return key, dict(labels) class MetricExposer: """The MetricExposer collects all metrics and exposes them via configured outputs""" - def __init__(self, config, metric_targets, shared_dict, lock, logger): + def __init__(self, config, prometheus_exporter, shared_dict, lock, logger): self._shared_dict = shared_dict self._print_period = config.get("period", 180) self._cumulative = config.get("cumulative", True) @@ -20,12 +28,7 @@ def __init__(self, config, metric_targets, shared_dict, lock, logger): self._logger = logger self._first_metrics_exposed = False self._timer = Value(c_double, time() + self._print_period) - - self.output_targets = [] - if metric_targets and metric_targets.file_target: - self.output_targets.append(metric_targets.file_target) - if metric_targets and metric_targets.prometheus_target: - self.output_targets.append(metric_targets.prometheus_target) + self._prometheus_exporter = prometheus_exporter def expose(self, metrics): """ @@ -35,7 +38,7 @@ def expose(self, metrics): pipeline, or in an independent form, where each multiprocessing pipeline will be exposed directly. """ - if not self.output_targets: + if not self._prometheus_exporter: return if self._time_to_expose(): @@ -117,5 +120,21 @@ def _send_to_output(self, metrics): Passes the metric object to the configured outputs such that they can transform and expose them """ - for output in self.output_targets: - output.expose(metrics) + for key_labels, value in metrics.items(): + key, labels = split_key_label_string(key_labels) + if key not in self._prometheus_exporter.metrics.keys(): + label_names = [] + if labels: + label_names = labels.keys() + self._prometheus_exporter.create_new_metric_exporter(key, label_names) + + if labels: + self._prometheus_exporter.metrics[key].labels(**labels).set(value) + else: + self._prometheus_exporter.metrics[key].set(value) + + interval = self._prometheus_exporter.configuration["period"] + labels = { + "component": "logprep", + } + self._prometheus_exporter.tracking_interval.labels(**labels).set(interval) diff --git a/logprep/metrics/metric_targets.py b/logprep/metrics/metric_targets.py deleted file mode 100644 index 2cba701de..000000000 --- a/logprep/metrics/metric_targets.py +++ /dev/null @@ -1,146 +0,0 @@ -"""This module implements different targets for the logprep metrics""" -import datetime -import json -from logging import getLogger, Logger -from logging.handlers import TimedRotatingFileHandler -from os.path import dirname -from pathlib import Path - -from logprep._version import get_versions -from logprep.metrics.metric import MetricTargets -from logprep.util.helper import add_field_to -from logprep.util.prometheus_exporter import PrometheusStatsExporter - - -def split_key_label_string(key_label_string): - """Splits the key label string into separate variables""" - if ";" not in key_label_string: - return key_label_string, {} - key, labels = key_label_string.split(";") - labels = labels.split(",") - labels = [label.split(":") for label in labels] - return key, dict(labels) - - -def get_metric_targets(config: dict, logger: Logger) -> MetricTargets: - """Checks the given configuration and creates the proper metric targets""" - metric_configs = config.get("metrics", {}) - - if not metric_configs.get("enabled", False): - logger.info("Metric tracking is disabled via config") - return MetricTargets(None, None) - - target_configs = metric_configs.get("targets", []) - file_target = None - prometheus_target = None - for target in target_configs: - if "file" in target.keys(): - file_target = MetricFileTarget.create(target.get("file"), config) - if "prometheus" in target.keys(): - prometheus_target = PrometheusMetricTarget.create(config, logger) - return MetricTargets(file_target, prometheus_target) - - -class MetricTarget: - """General MetricTarget defining the expose method""" - - def expose(self, metrics): - """Exposes the given metrics to the target""" - raise NotImplementedError # pragma: no cover - - -class MetricFileTarget(MetricTarget): - """The MetricFileTarget writes the metrics as a json to a rolling file handler""" - - def __init__(self, file_logger, config): - self._file_logger = file_logger - self._logprep_config = config - - @classmethod - def create(cls, file_config, config): - """Creates a MetricFileTarget""" - file_exporter = getLogger("Logprep-JSON-File-Logger") - file_exporter.handlers = [] - - log_path = file_config.get("path", "./logprep-metrics.jsonl") - Path(dirname(log_path)).mkdir(parents=True, exist_ok=True) - interval = file_config.get("rollover_interval", 60 * 60 * 24) - backup_count = file_config.get("backup_count", 10) - file_exporter.addHandler( - TimedRotatingFileHandler( - log_path, when="S", interval=interval, backupCount=backup_count - ) - ) - return MetricFileTarget(file_exporter, config) - - def expose(self, metrics): - metric_json = self._convert_metrics_to_pretty_json(metrics) - metric_json = self._add_meta_information(metric_json) - self._file_logger.info(json.dumps(metric_json)) - - @staticmethod - def _convert_metrics_to_pretty_json(metrics): - metric_data = {} - for key_labels, value in metrics.items(): - metric_name, labels = split_key_label_string(key_labels) - if labels: - dotted_path = ( - ".".join([f"{l[0]}.{l[1]}" for l in labels.items()]) + f".{metric_name}" - ) - else: - dotted_path = f"{metric_name}" - add_field_to(metric_data, dotted_path, value) - return metric_data - - def _add_meta_information(self, metric_json): - """Adds a timestamp to the metric data""" - if "meta" not in metric_json: - metric_json["meta"] = {} - metric_json["meta"]["timestamp"] = datetime.datetime.now().isoformat() - - if "version" not in metric_json.get("meta"): - metric_json["meta"]["version"] = { - "logprep": get_versions().get("version"), - "config": self._logprep_config.get("version", "unset"), - } - return metric_json - - -class PrometheusMetricTarget(MetricTarget): - """ - The PrometheusMetricTarget writes the metrics to the prometheus exporter, exposing them via - the webinterface. - """ - - def __init__(self, prometheus_exporter: PrometheusStatsExporter, config: dict): - self.prometheus_exporter = prometheus_exporter - self._logprep_config = config - - @classmethod - def create(cls, config, logger): - """Creates a PrometheusMetricTarget""" - prometheus_exporter = PrometheusStatsExporter(config.get("metrics", {}), logger) - prometheus_exporter.run() - return PrometheusMetricTarget(prometheus_exporter, config) - - def expose(self, metrics): - for key_labels, value in metrics.items(): - key, labels = split_key_label_string(key_labels) - if key not in self.prometheus_exporter.metrics.keys(): - label_names = [] - if labels: - label_names = labels.keys() - self.prometheus_exporter.create_new_metric_exporter(key, label_names) - - if labels: - self.prometheus_exporter.metrics[key].labels(**labels).set(value) - else: - self.prometheus_exporter.metrics[key].set(value) - - interval = self.prometheus_exporter.configuration["period"] - labels = { - "component": "logprep", - "logprep_version": get_versions().get("version"), - "config_version": self._logprep_config.get("version", "unset"), - } - self.prometheus_exporter.tracking_interval.labels(**labels).set(interval) diff --git a/logprep/runner.py b/logprep/runner.py index 697764cf8..d48026195 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -9,7 +9,7 @@ from schedule import Scheduler from logprep.framework.pipeline_manager import PipelineManager -from logprep.metrics.metric_targets import get_metric_targets +from logprep.metrics.metric import MetricTargets from logprep.util.configuration import Configuration, InvalidConfigurationError from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler @@ -93,7 +93,6 @@ def __init__(self, bypass_check_to_obtain_non_singleton_instance=False): self._configuration = None self._yaml_path = None self._logger = None - self._metric_targets = None self._log_handler = None self._config_refresh_interval = None @@ -267,8 +266,7 @@ def _schedule_config_refresh_job(self): def _create_manager(self): if self._manager is not None: raise MustNotCreateMoreThanOneManagerError - metric_targets = get_metric_targets(self._configuration, self._logger) - self._manager = PipelineManager(self._logger, metric_targets) + self._manager = PipelineManager(self._logger) def stop(self): """Stop the current process""" diff --git a/logprep/util/prometheus_exporter.py b/logprep/util/prometheus_exporter.py index c7518c512..0dd31d615 100644 --- a/logprep/util/prometheus_exporter.py +++ b/logprep/util/prometheus_exporter.py @@ -5,7 +5,7 @@ from os import listdir, path from os.path import isfile -from prometheus_client import start_http_server, multiprocess, REGISTRY, Gauge +from prometheus_client import REGISTRY, Gauge, multiprocess, start_http_server class PrometheusStatsExporter: @@ -61,10 +61,7 @@ def remove_metrics_from_process(self, pid): self._logger.debug(f"Removed stale metric files: {removed_files}") def _extract_port_from(self, configuration): - target_configs = configuration.get("targets", []) - for config in target_configs: - if "prometheus" in config: - self._port = config.get("prometheus").get("port") + self._port = configuration.get("metrics", {}).get("port") def _set_up_metrics(self): """Sets up the metrics that the prometheus exporter should expose""" diff --git a/quickstart/exampledata/config/pipeline.yml b/quickstart/exampledata/config/pipeline.yml index 5d5a02b59..3c3998230 100644 --- a/quickstart/exampledata/config/pipeline.yml +++ b/quickstart/exampledata/config/pipeline.yml @@ -12,9 +12,7 @@ metrics: measure_time: enabled: true append_to_event: false - targets: - - prometheus: - port: 8000 + port: 8000 pipeline: - labelername: diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 48abacdcf..54b76f8b6 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -75,7 +75,7 @@ def setup_method(self): lock=self.lock, shared_dict=self.shared_dict, used_server_ports=mock.MagicMock(), - metric_targets=self.metric_targets, + prometheus_exporter=mock.MagicMock(), ) def test_fails_if_log_handler_is_not_of_type_loghandler(self, _): @@ -89,7 +89,7 @@ def test_fails_if_log_handler_is_not_of_type_loghandler(self, _): lock=self.lock, shared_dict=self.shared_dict, used_server_ports=mock.MagicMock(), - metric_targets=self.metric_targets, + prometheus_exporter=mock.MagicMock(), ) def test_pipeline_property_returns_pipeline(self, mock_create): diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index b0c215a0d..db9377a54 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -1,18 +1,25 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init -from logging import WARNING, Logger, INFO, ERROR -from time import time, sleep +from logging import ERROR, INFO, WARNING, Logger +from time import sleep, time from unittest import mock from pytest import raises from logprep.framework.pipeline import MultiprocessingPipeline -from logprep.framework.pipeline_manager import PipelineManager, MustSetConfigurationFirstError +from logprep.framework.pipeline_manager import ( + MustSetConfigurationFirstError, + PipelineManager, +) from logprep.metrics.metric import MetricTargets from logprep.util.configuration import Configuration from tests.testdata.metadata import path_to_config -from tests.util.testhelpers import AssertEmitsLogMessage, HandlerStub, AssertEmitsLogMessages +from tests.util.testhelpers import ( + AssertEmitsLogMessage, + AssertEmitsLogMessages, + HandlerStub, +) class MultiprocessingPipelineMock(MultiprocessingPipeline): @@ -58,11 +65,11 @@ def setup_class(self): self.metric_targets = MetricTargets(file_target=self.logger, prometheus_target=None) self.logger.addHandler(self.handler) - self.manager = PipelineManagerForTesting(self.logger, self.metric_targets) + self.manager = PipelineManagerForTesting(self.logger) self.manager.set_configuration(self.config) def test_create_pipeline_fails_if_config_is_unset(self): - manager = PipelineManager(self.logger, self.metric_targets) + manager = PipelineManager(self.logger) with raises( MustSetConfigurationFirstError, @@ -214,15 +221,13 @@ def test_restart_failed_pipelines_removes_metrics_database_if_prometheus_target_ failed_pipeline.is_alive = mock.MagicMock() # nosemgrep failed_pipeline.is_alive.return_value = False # nosemgrep failed_pipeline.pid = 42 - metric_targets = MetricTargets(None, prometheus_exporter_mock) - manager = PipelineManager(self.logger, metric_targets) + manager = PipelineManager(self.logger) + manager.set_configuration({"metrics": {"enabled": True}, "process_count": 2}) + manager._prometheus_exporter = prometheus_exporter_mock manager._pipelines = [failed_pipeline] - manager._configuration = {"process_count": 2} manager.restart_failed_pipeline() - prometheus_exporter_mock.prometheus_exporter.remove_metrics_from_process.assert_called() - prometheus_exporter_mock.prometheus_exporter.remove_metrics_from_process.assert_called_with( - 42 - ) + prometheus_exporter_mock.remove_metrics_from_process.assert_called() + prometheus_exporter_mock.remove_metrics_from_process.assert_called_with(42) def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_no_metric_target_is_configured( self, @@ -231,7 +236,7 @@ def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_no_metric failed_pipeline.metric_targets = None failed_pipeline.is_alive = mock.MagicMock() # nosemgrep failed_pipeline.is_alive.return_value = False # nosemgrep - manager = PipelineManager(self.logger, None) + manager = PipelineManager(self.logger) manager._pipelines = [failed_pipeline] manager._configuration = {"process_count": 2} manager.restart_failed_pipeline() diff --git a/tests/unit/metrics/test_metric_exposer.py b/tests/unit/metrics/test_metric_exposer.py index f30310ebc..6f123e358 100644 --- a/tests/unit/metrics/test_metric_exposer.py +++ b/tests/unit/metrics/test_metric_exposer.py @@ -4,7 +4,7 @@ # pylint: disable=line-too-long import logging from ctypes import c_double -from multiprocessing import Manager, Lock, Value +from multiprocessing import Lock, Manager, Value from time import time from unittest import mock @@ -24,16 +24,7 @@ def setup_method(self): "enabled": True, "cumulative": True, "aggregate_processes": True, - "targets": [ - {"prometheus": {"port": 8000}}, - { - "file": { - "path": "./logs/status.json", - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], + "port": 8000, } self.shared_dict = Manager().dict() @@ -109,15 +100,6 @@ def test_aggregate_metrics_combines_list_of_metrics_to_one(self): assert metrics == expected_metrics - def test_send_to_output_calls_expose_of_configured_targets(self): - mock_file_target = mock.MagicMock() - mock_prometheus_target = mock.MagicMock() - self.exposer.output_targets = [mock_file_target, mock_prometheus_target] - metrics = Rule.RuleMetrics(labels={"type": "generic"}) - self.exposer._send_to_output(metrics) - self.exposer.output_targets[0].expose.assert_called_with(metrics) - self.exposer.output_targets[1].expose.assert_called_with(metrics) - @mock.patch("logprep.metrics.metric_exposer.MetricExposer._store_metrics") @mock.patch( "logprep.metrics.metric_exposer.MetricExposer._expose_aggregated_metrics_from_shared_dict" diff --git a/tests/unit/metrics/test_metric_targets.py b/tests/unit/metrics/test_metric_targets.py deleted file mode 100644 index c74caa5aa..000000000 --- a/tests/unit/metrics/test_metric_targets.py +++ /dev/null @@ -1,503 +0,0 @@ -# pylint: disable=missing-docstring -# pylint: disable=protected-access -# pylint: disable=attribute-defined-outside-init -# pylint: disable=no-self-use -# pylint: disable=line-too-long -import json -import logging -import os -import shutil -import tempfile -from datetime import datetime -from logging.handlers import TimedRotatingFileHandler -from pathlib import Path -from unittest import mock -from unittest.mock import MagicMock - -import pytest -from prometheus_client import Gauge - -from logprep._version import get_versions -from logprep.abc.connector import Connector -from logprep.abc.processor import Processor -from logprep.framework.pipeline import Pipeline -from logprep.framework.rule_tree.rule_tree import RuleTree -from logprep.metrics.metric_exposer import MetricExposer -from logprep.metrics.metric_targets import ( - PrometheusMetricTarget, - MetricFileTarget, - split_key_label_string, - get_metric_targets, -) -from logprep.processor.base.rule import Rule -from logprep.util.prometheus_exporter import PrometheusStatsExporter - - -@pytest.fixture(name="pipeline_metrics") -def fixture_full_pipeline_metrics(): - rule_metrics_one = Rule.RuleMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "generic_adder", - "rule_tree": "specific", - } - ) - specific_rule_tree_metrics = RuleTree.RuleTreeMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "generic_adder", - "rule_tree": "specific", - }, - rules=[rule_metrics_one], - ) - rule_metrics_two = Rule.RuleMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "generic_adder", - "rule_tree": "generic", - } - ) - rule_metrics_two._number_of_matches = 2 - rule_metrics_three = Rule.RuleMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "generic_adder", - "rule_tree": "generic", - } - ) - rule_metrics_three._number_of_matches = 3 - generic_rule_tree_metrics = RuleTree.RuleTreeMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "generic_adder", - "rule_tree": "generic", - }, - rules=[rule_metrics_two, rule_metrics_three], - ) - generic_adder_metrics = Processor.ProcessorMetrics( - labels={"pipeline": "pipeline-01", "processor": "generic_adder"}, - generic_rule_tree=generic_rule_tree_metrics, - specific_rule_tree=specific_rule_tree_metrics, - ) - - rule_metrics_one = Rule.RuleMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "normalizer", - "tree_type": "specific", - } - ) - specific_rule_tree_metrics = RuleTree.RuleTreeMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "normalizer", - "rule_tree": "specific", - }, - rules=[rule_metrics_one], - ) - rule_metrics_two = Rule.RuleMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "normalizer", - "rule_tree": "generic", - } - ) - generic_rule_tree_metrics = RuleTree.RuleTreeMetrics( - labels={ - "pipeline": "pipeline-01", - "processor": "normalizer", - "rule_tree": "generic", - }, - rules=[rule_metrics_two], - ) - normalizer_metrics = Processor.ProcessorMetrics( - labels={"pipeline": "pipeline-01", "processor": "normalizer"}, - generic_rule_tree=generic_rule_tree_metrics, - specific_rule_tree=specific_rule_tree_metrics, - ) - - input_metrics = Connector.ConnectorMetrics( - labels={"pipeline": "pipeline-01", "connector": "input"} - ) - output_metrics = Connector.ConnectorMetrics( - labels={"pipeline": "pipeline-01", "connector": "output"} - ) - - pipeline_metrics = Pipeline.PipelineMetrics( - input=input_metrics, - output=output_metrics, - labels={"pipeline": "pipeline-01"}, - pipeline=[generic_adder_metrics, normalizer_metrics], - ) - return pipeline_metrics - - -def test_split_key_label_string(): - key_label_string = "logprep_metric_name;label_one:1,label_two:2" - key, labels = split_key_label_string(key_label_string) - expected_key = "logprep_metric_name" - expected_labels = {"label_one": "1", "label_two": "2"} - assert expected_key == key - assert expected_labels == labels - - -class TestGetMetricTargets: - def test_get_metric_targets_return_no_targets_with_empty_config(self): - empty_config = {} - targets = get_metric_targets(empty_config, logging.getLogger("test-logger")) - assert targets.file_target is None - assert targets.prometheus_target is None - - def test_get_metric_targets_returns_no_targets_if_disabled(self): - empty_config = {"metrics": {"enabled": False}} - targets = get_metric_targets(empty_config, logging.getLogger("test-logger")) - assert targets.file_target is None - assert targets.prometheus_target is None - - @mock.patch("logprep.metrics.metric_targets.MetricFileTarget.create") - def test_get_metric_target_returns_only_file_target(self, create_file_target_mock): - create_file_target_mock.return_value = mock.MagicMock() - empty_config = {"metrics": {"enabled": True, "targets": [{"file": {"some": "thing"}}]}} - targets = get_metric_targets(empty_config, logging.getLogger("test-logger")) - assert isinstance(targets.file_target, MagicMock) - assert targets.prometheus_target is None - - @mock.patch("logprep.metrics.metric_targets.PrometheusMetricTarget.create") - def test_get_metric_target_returns_only_prometheus_target(self, create_prometheus_target_mock): - create_prometheus_target_mock.return_value = mock.MagicMock() - empty_config = { - "metrics": {"enabled": True, "targets": [{"prometheus": {"some": "thing"}}]} - } - targets = get_metric_targets(empty_config, logging.getLogger("test-logger")) - assert isinstance(targets.prometheus_target, MagicMock) - assert targets.file_target is None - - @mock.patch("logprep.metrics.metric_targets.MetricFileTarget.create") - @mock.patch("logprep.metrics.metric_targets.PrometheusMetricTarget.create") - def test_get_metric_target_returns_both_targets( - self, create_file_target_mock, create_prometheus_target_mock - ): - create_file_target_mock.return_value = mock.MagicMock() - create_prometheus_target_mock.return_value = mock.MagicMock() - empty_config = { - "metrics": { - "enabled": True, - "targets": [{"prometheus": {"some": "thing"}}, {"file": {"some": "thing"}}], - } - } - targets = get_metric_targets(empty_config, logging.getLogger("test-logger")) - assert isinstance(targets.prometheus_target, MagicMock) - assert isinstance(targets.file_target, MagicMock) - - -class TestMetricFileTarget: - def setup_method(self): - logger = logging.getLogger("test-file-metric-logger") - self.logprep_config = {"version": 1, "other_fields": "are_unimportant_for_test"} - self.target = MetricFileTarget(logger, self.logprep_config) - - def test_create_method(self): - config = {"path": "./logs/status.json", "rollover_interval": 86400, "backup_count": 10} - created_target = MetricFileTarget.create(config, {}) - assert isinstance(created_target._file_logger, logging.Logger) - assert isinstance(created_target._file_logger.handlers[0], TimedRotatingFileHandler) - assert created_target._file_logger.handlers[0].interval == config["rollover_interval"] - assert created_target._file_logger.handlers[0].backupCount == config["backup_count"] - assert created_target._file_logger.handlers[0].baseFilename.endswith(config["path"][1:]) - - def test_convert_metrics_to_pretty_json(self, pipeline_metrics): - exposed_metrics = pipeline_metrics.expose() - exposed_json = self.target._convert_metrics_to_pretty_json(exposed_metrics) - expected_json = { - "pipeline": { - "pipeline-01": { - "processor": { - "generic_adder": { - "logprep_processor_number_of_processed_events": 0.0, - "logprep_processor_mean_processing_time_per_event": 0.0, - "logprep_processor_number_of_warnings": 0.0, - "logprep_processor_number_of_errors": 0.0, - "rule_tree": { - "generic": { - "logprep_number_of_rules": 0.0, - "logprep_number_of_matches": 5.0, - "logprep_mean_processing_time": 0.0, - }, - "specific": { - "logprep_number_of_rules": 0.0, - "logprep_number_of_matches": 0.0, - "logprep_mean_processing_time": 0.0, - }, - }, - }, - "normalizer": { - "logprep_processor_number_of_processed_events": 0.0, - "logprep_processor_mean_processing_time_per_event": 0.0, - "logprep_processor_number_of_warnings": 0.0, - "logprep_processor_number_of_errors": 0.0, - "rule_tree": { - "generic": { - "logprep_number_of_rules": 0.0, - "logprep_number_of_matches": 0.0, - "logprep_mean_processing_time": 0.0, - }, - "specific": { - "logprep_number_of_rules": 0.0, - "logprep_number_of_matches": 0.0, - "logprep_mean_processing_time": 0.0, - }, - }, - }, - }, - "connector": { - "input": { - "logprep_connector_mean_processing_time_per_event": 0.0, - "logprep_connector_number_of_processed_events": 0.0, - "logprep_connector_number_of_warnings": 0.0, - "logprep_connector_number_of_errors": 0.0, - }, - "output": { - "logprep_connector_mean_processing_time_per_event": 0.0, - "logprep_connector_number_of_processed_events": 0.0, - "logprep_connector_number_of_warnings": 0.0, - "logprep_connector_number_of_errors": 0.0, - }, - }, - "logprep_pipeline_kafka_offset": 0.0, - "logprep_pipeline_mean_processing_time_per_event": 0.0, - "logprep_pipeline_number_of_processed_events": 0.0, - "logprep_pipeline_sum_of_processor_warnings": 0.0, - "logprep_pipeline_sum_of_processor_errors": 0.0, - } - } - } - - assert exposed_json == expected_json - - def test_convert_metrics_to_pretty_json_with_empty_labels(self): - rule_metrics_one = Rule.RuleMetrics(labels={"foo": "bar"}) - specific_rule_tree_metrics = RuleTree.RuleTreeMetrics( - labels={"foo": "bar"}, - rules=[rule_metrics_one], - ) - exposed_metrics = specific_rule_tree_metrics.expose() - stripped_metrics = dict( - (MetricExposer._strip_key(key, label_name="foo"), value) - for key, value in exposed_metrics.items() - ) - exposed_json = self.target._convert_metrics_to_pretty_json(stripped_metrics) - expected_json = { - "logprep_number_of_rules": 0.0, - "logprep_number_of_matches": 0.0, - "logprep_mean_processing_time": 0.0, - } - - assert exposed_json == expected_json - - def test_add_meta_information_adds_meta_subfield_with_timestamp_and_versions(self): - metric_json = {"pipeline": "not important here"} - metric_json = self.target._add_meta_information(metric_json) - assert "meta" in metric_json - assert "timestamp" in metric_json["meta"].keys() - assert isinstance( - datetime.strptime(metric_json["meta"]["timestamp"], "%Y-%m-%dT%H:%M:%S.%f"), datetime - ) - expected_versions = { - "logprep": get_versions().get("version"), - "config": self.logprep_config.get("version"), - } - assert metric_json["meta"]["version"] == expected_versions - - def test_add_meta_information_adds_unset_for_config_version_if_not_found(self): - logger = logging.getLogger("test-file-metric-logger") - logprep_config = {"config": "but without a version field"} - target = MetricFileTarget(logger, logprep_config) - - metric_json = {"pipeline": "not important here"} - metric_json = target._add_meta_information(metric_json) - expected_versions = { - "logprep": get_versions().get("version"), - "config": "unset", - } - assert metric_json["meta"]["version"] == expected_versions - - def test_expose_preprocesses_metrics_and_prints_them_to_the_logger(self, caplog): - metrics = Rule.RuleMetrics(labels={"type": "generic"}) - exposed_metrics = metrics.expose() - with caplog.at_level(logging.INFO): - self.target.expose(exposed_metrics) - assert len(caplog.messages) == 1 - exposed_json = json.loads(caplog.messages[0]) - assert isinstance(exposed_json, dict) - assert "meta" in exposed_json - assert "timestamp" in exposed_json["meta"] - - -class TestPrometheusMetricTarget: - def setup_method(self): - self.logprep_config = { - "version": 1, - "metrics": { - "period": 10, - "enabled": True, - "cumulative": True, - "aggregate_processes": True, - "targets": [ - {"prometheus": {"port": 8000}}, - ], - }, - } - logger = logging.getLogger("test-file-metric-logger") - prometheus_exporter = PrometheusStatsExporter(self.logprep_config.get("metrics"), logger) - self.target = PrometheusMetricTarget(prometheus_exporter, self.logprep_config) - - def test_create_method(self, tmpdir): - with mock.patch.dict(os.environ, {"PROMETHEUS_MULTIPROC_DIR": f"{tmpdir}/some/dir"}): - config = {"port": 8000} - created_target = PrometheusMetricTarget.create(config, logging.getLogger("test-logger")) - assert isinstance(created_target, PrometheusMetricTarget) - assert isinstance(created_target.prometheus_exporter, PrometheusStatsExporter) - assert not created_target.prometheus_exporter.metrics - assert created_target.prometheus_exporter._port == config["port"] - assert created_target.prometheus_exporter._logger.name == "test-logger" - - @mock.patch("logprep.util.prometheus_exporter.PrometheusStatsExporter.run") - def test_create_method_without_env_variable(self, _): - with mock.patch.dict(os.environ, {}): - config = {"port": 8000} - created_target = PrometheusMetricTarget.create(config, logging.getLogger("test-logger")) - logprep_tmp_dir = Path(tempfile.gettempdir()) / "logprep" - expected_metric_path = logprep_tmp_dir / "prometheus_multiproc_dir" - assert created_target.prometheus_exporter.multi_processing_dir == str( - expected_metric_path - ) - shutil.rmtree(logprep_tmp_dir) - - def test_expose_creates_new_metric_exporter_if_it_does_not_exist_yet(self): - metrics = Rule.RuleMetrics(labels={"type": "generic"}) - assert self.target.prometheus_exporter.metrics == {} - exposed_metrics = metrics.expose() - self.target.expose(exposed_metrics) - assert len(self.target.prometheus_exporter.metrics) == len(exposed_metrics) - for metric in self.target.prometheus_exporter.metrics.values(): - assert isinstance(metric, Gauge) - - @mock.patch("prometheus_client.Gauge.labels") - def test_expose_calls_prometheus_exporter_with_expected_arguments( - self, mock_labels, pipeline_metrics - ): - self.target.expose(pipeline_metrics.expose()) - mock_labels.assert_has_calls( - [ - mock.call(pipeline="pipeline-01", processor="generic_adder"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder", rule_tree="generic"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder", rule_tree="generic"), - mock.call().set(5.0), - mock.call(pipeline="pipeline-01", processor="generic_adder", rule_tree="generic"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder", rule_tree="specific"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder", rule_tree="specific"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="generic_adder", rule_tree="specific"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer", rule_tree="generic"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer", rule_tree="generic"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer", rule_tree="generic"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer", rule_tree="specific"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer", rule_tree="specific"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01", processor="normalizer", rule_tree="specific"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01"), - mock.call().set(0.0), - mock.call(pipeline="pipeline-01"), - mock.call().set(0.0), - mock.call( - component="logprep", - logprep_version=get_versions().get("version"), - config_version=self.logprep_config.get("version"), - ), - mock.call().set(self.logprep_config.get("metrics").get("period")), - ] - ) - - @mock.patch("prometheus_client.Gauge.labels") - def test_expose_calls_prometheus_exporter_and_sets_config_version_to_unset_if_no_config_version_found( - self, mock_labels, pipeline_metrics - ): - logprep_config = { - "metrics": { - "period": 10, - "enabled": True, - "cumulative": True, - "aggregate_processes": True, - "targets": [ - {"prometheus": {"port": 8000}}, - ], - }, - } - logger = logging.getLogger("test-file-metric-logger") - prometheus_exporter = PrometheusStatsExporter(logprep_config.get("metrics"), logger) - target = PrometheusMetricTarget(prometheus_exporter, logprep_config) - target.expose(pipeline_metrics.expose()) - mock_labels.assert_has_calls( - [ - mock.call( - component="logprep", - logprep_version=get_versions().get("version"), - config_version="unset", - ), - mock.call().set(logprep_config.get("metrics").get("period")), - ] - ) - - @mock.patch("prometheus_client.Gauge.set") - def test_expose_calls_prometheus_exporter_without_labels(self, mock_labels): - rule_metrics_one = Rule.RuleMetrics(labels={"foo": "bar"}) - rule_metrics_one._number_of_matches = 3 - specific_rule_tree_metrics = RuleTree.RuleTreeMetrics( - labels={"foo": "bar"}, - rules=[rule_metrics_one], - ) - specific_rule_tree_metrics.number_of_rules = 3 - exposed_metrics = specific_rule_tree_metrics.expose() - stripped_metrics = dict( - (MetricExposer._strip_key(key, label_name="foo"), value) - for key, value in exposed_metrics.items() - ) - self.target.expose(stripped_metrics) - mock_labels.assert_has_calls( - [ - mock.call(3.0), - mock.call(3.0), - mock.call(0.0), - mock.call(10), - ] - ) From 40695f2f6bcefcbf56c41c2159cd66a0472718de Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 06:39:38 +0000 Subject: [PATCH 2/8] fix tests --- logprep/util/prometheus_exporter.py | 8 ++------ tests/unit/test_runner.py | 2 +- tests/unit/util/test_prometheus_exporter.py | 21 ++++++++------------- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/logprep/util/prometheus_exporter.py b/logprep/util/prometheus_exporter.py index 0dd31d615..55eec9829 100644 --- a/logprep/util/prometheus_exporter.py +++ b/logprep/util/prometheus_exporter.py @@ -12,15 +12,14 @@ class PrometheusStatsExporter: """Used to control the prometheus exporter and to manage the metrics""" metric_prefix: str = "logprep_" - multi_processing_dir = None + multi_processing_dir: str = None def __init__(self, status_logger_config, application_logger): self._logger = application_logger self.configuration = status_logger_config - self._port = 8000 + self._port = status_logger_config.get("metrics", {}).get("port", 8000) self._prepare_multiprocessing() - self._extract_port_from(self.configuration) self._set_up_metrics() def _prepare_multiprocessing(self): @@ -60,9 +59,6 @@ def remove_metrics_from_process(self, pid): removed_files.append(filename) self._logger.debug(f"Removed stale metric files: {removed_files}") - def _extract_port_from(self, configuration): - self._port = configuration.get("metrics", {}).get("port") - def _set_up_metrics(self): """Sets up the metrics that the prometheus exporter should expose""" self.metrics = {} diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index ebe8b02e1..5b1679c8c 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -43,7 +43,7 @@ def __init__(self): super().__init__(bypass_check_to_obtain_non_singleton_instance=True) def _create_manager(self): - self._manager = PipelineManagerForTesting(self._logger, self._metric_targets) + self._manager = PipelineManagerForTesting(self._logger) class LogprepRunnerTest: diff --git a/tests/unit/util/test_prometheus_exporter.py b/tests/unit/util/test_prometheus_exporter.py index bccf6e3ec..676ae0477 100644 --- a/tests/unit/util/test_prometheus_exporter.py +++ b/tests/unit/util/test_prometheus_exporter.py @@ -8,7 +8,7 @@ from unittest import mock import pytest -from prometheus_client import Gauge, REGISTRY +from prometheus_client import REGISTRY, Gauge from logprep.util.prometheus_exporter import PrometheusStatsExporter @@ -17,27 +17,22 @@ class TestPrometheusStatsExporter: def setup_method(self): REGISTRY.__init__() self.metrics_config = { - "period": 10, - "enabled": True, - "cumulative": True, - "targets": [ - {"prometheus": {"port": 80}}, - {"file": {"path": "", "rollover_interval": 200, "backup_count": 10}}, - ], + "metrics": {"period": 10, "enabled": True, "cumulative": True, "port": 80} } def test_correct_setup(self): exporter = PrometheusStatsExporter(self.metrics_config, getLogger("test-logger")) assert not exporter.metrics assert isinstance(exporter.tracking_interval, Gauge) - assert exporter._port == self.metrics_config["targets"][0]["prometheus"]["port"] + assert exporter._port == self.metrics_config["metrics"]["port"] def test_default_port_if_missing_in_config(self): metrics_config = { - "period": 10, - "enabled": True, - "cumulative": True, - "targets": [{"file": {"path": "", "rollover_interval": 200, "backup_count": 10}}], + "metrics": { + "period": 10, + "enabled": True, + "cumulative": True, + } } exporter = PrometheusStatsExporter(metrics_config, getLogger("test-logger")) From 4b395e8d8488de12583cf8bf1349445695196402 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 07:03:13 +0000 Subject: [PATCH 3/8] fixed starting prometheus --- logprep/framework/pipeline_manager.py | 10 +-- logprep/runner.py | 1 + logprep/util/configuration.py | 69 +++++-------------- logprep/util/prometheus_exporter.py | 2 +- tests/unit/framework/test_pipeline_manager.py | 2 +- 5 files changed, 27 insertions(+), 57 deletions(-) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 8e2fde4ce..317602862 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -26,7 +26,7 @@ class PipelineManager: def __init__(self, logger: Logger): self._logger = logger - self._prometheus_exporter = None + self.prometheus_exporter = None self._log_handler = MultiprocessingLogHandler(self._logger.level) self._pipelines = [] @@ -47,7 +47,7 @@ def set_configuration(self, configuration: Configuration): self._shared_dict[idx] = None prometheus_config = configuration.get("metrics", {}) if prometheus_config.get("enabled", False): - self._prometheus_exporter = PrometheusStatsExporter(prometheus_config, self._logger) + self.prometheus_exporter = PrometheusStatsExporter(prometheus_config, self._logger) def get_count(self) -> int: """Get the pipeline count. @@ -93,9 +93,9 @@ def restart_failed_pipeline(self): failed_pipelines = [pipeline for pipeline in self._pipelines if not pipeline.is_alive()] for failed_pipeline in failed_pipelines: self._pipelines.remove(failed_pipeline) - if self._prometheus_exporter is None: + if self.prometheus_exporter is None: continue - self._prometheus_exporter.remove_metrics_from_process(failed_pipeline.pid) + self.prometheus_exporter.remove_metrics_from_process(failed_pipeline.pid) if failed_pipelines: self.set_count(self._configuration.get("process_count")) @@ -126,5 +126,5 @@ def _create_pipeline(self, index) -> MultiprocessingPipeline: lock=self._lock, shared_dict=self._shared_dict, used_server_ports=self._used_server_ports, - prometheus_exporter=self._prometheus_exporter, + prometheus_exporter=self.prometheus_exporter, ) diff --git a/logprep/runner.py b/logprep/runner.py index d48026195..5f96a91ba 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -182,6 +182,7 @@ def start(self): with self._continue_iterating.get_lock(): self._continue_iterating.value = True self._schedule_config_refresh_job() + self._manager.prometheus_exporter.run() self._logger.info("Startup complete") self._logger.debug("Runner iterating") for _ in self._keep_iterating(): diff --git a/logprep/util/configuration.py b/logprep/util/configuration.py index bfe89ee6c..e2802bab3 100644 --- a/logprep/util/configuration.py +++ b/logprep/util/configuration.py @@ -15,9 +15,9 @@ from logprep.factory import Factory from logprep.factory_error import FactoryError from logprep.factory_error import ( - UnknownComponentTypeError, InvalidConfigurationError as FactoryInvalidConfigurationError, ) +from logprep.factory_error import UnknownComponentTypeError from logprep.processor.base.exceptions import InvalidRuleDefinitionError from logprep.util.getter import GetterFactory from logprep.util.helper import print_fcolor @@ -421,39 +421,26 @@ def _verify_processor_outputs(self, processor_config): ) def _verify_metrics_config(self): - if self.get("metrics"): + metrics_config = self.get("metrics") + if metrics_config: errors = [] - required_keys = [ - "enabled", - "period", - "cumulative", - "aggregate_processes", - "measure_time", - "targets", - ] - - for key in required_keys: - if key not in self["metrics"]: - errors.append(RequiredConfigurationKeyMissingError(f"metrics > {key}")) - - targets = self.get("metrics").get("targets", []) - - if not targets: - errors.append( - IncalidMetricsConfigurationError("At least one target has to be configured") - ) - for target in targets: - current_target = list(target.keys())[0] - try: - if current_target == "prometheus": - self._verify_status_logger_prometheus_target(target["prometheus"]) - elif current_target == "file": - self._verify_status_logger_file_target(target["file"]) - else: - raise IncalidMetricsConfigurationError(f"Unknown target '{current_target}'") - except InvalidConfigurationError as error: - errors.append(error) + if "enabled" not in metrics_config: + errors.append(RequiredConfigurationKeyMissingError("metrics > enabled")) + + if metrics_config.get("enabled"): + required_keys = [ + "enabled", + "period", + "cumulative", + "aggregate_processes", + "measure_time", + "port", + ] + + for key in required_keys: + if key not in self["metrics"]: + errors.append(RequiredConfigurationKeyMissingError(f"metrics > {key}")) try: self._verify_measure_time_config(self.get("metrics").get("measure_time")) @@ -463,24 +450,6 @@ def _verify_metrics_config(self): if errors: raise InvalidConfigurationErrors(errors) - @staticmethod - def _verify_status_logger_prometheus_target(target_config): - if target_config is None or not target_config.get("port"): - raise RequiredConfigurationKeyMissingError("metrics > targets > prometheus > port") - - @staticmethod - def _verify_status_logger_file_target(target_config): - required_keys = {"path", "rollover_interval", "backup_count"} - given_keys = set(target_config.keys()) - missing_keys = required_keys.difference(given_keys) - - if missing_keys: - raise RequiredConfigurationKeyMissingError( - f"The following option keys for the " - f"metrics file target are missing: " - f"{missing_keys}" - ) - @staticmethod def _verify_measure_time_config(measure_time_config): required_keys = {"enabled", "append_to_event"} diff --git a/logprep/util/prometheus_exporter.py b/logprep/util/prometheus_exporter.py index 55eec9829..a2f7588f7 100644 --- a/logprep/util/prometheus_exporter.py +++ b/logprep/util/prometheus_exporter.py @@ -65,7 +65,7 @@ def _set_up_metrics(self): self.tracking_interval = Gauge( f"{self.metric_prefix}tracking_interval_in_seconds", "Tracking interval", - labelnames=["component", "logprep_version", "config_version"], + labelnames=["component"], registry=None, ) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index db9377a54..2828bf3c5 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -223,7 +223,7 @@ def test_restart_failed_pipelines_removes_metrics_database_if_prometheus_target_ failed_pipeline.pid = 42 manager = PipelineManager(self.logger) manager.set_configuration({"metrics": {"enabled": True}, "process_count": 2}) - manager._prometheus_exporter = prometheus_exporter_mock + manager.prometheus_exporter = prometheus_exporter_mock manager._pipelines = [failed_pipeline] manager.restart_failed_pipeline() prometheus_exporter_mock.remove_metrics_from_process.assert_called() From fda5bf9e3e40c99e24d269fd9ffc2951e624eb2f Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 07:16:08 +0000 Subject: [PATCH 4/8] fix tests --- logprep/runner.py | 3 +- tests/unit/util/test_configuration.py | 129 ++------------------------ 2 files changed, 11 insertions(+), 121 deletions(-) diff --git a/logprep/runner.py b/logprep/runner.py index 5f96a91ba..f6df93cf6 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -182,7 +182,8 @@ def start(self): with self._continue_iterating.get_lock(): self._continue_iterating.value = True self._schedule_config_refresh_job() - self._manager.prometheus_exporter.run() + if self._manager.prometheus_exporter: + self._manager.prometheus_exporter.run() self._logger.info("Startup complete") self._logger.debug("Runner iterating") for _ in self._keep_iterating(): diff --git a/tests/unit/util/test_configuration.py b/tests/unit/util/test_configuration.py index 68b0d44a3..bf4ab5ead 100644 --- a/tests/unit/util/test_configuration.py +++ b/tests/unit/util/test_configuration.py @@ -11,14 +11,14 @@ import pytest from logprep.util.configuration import ( - InvalidConfigurationError, Configuration, IncalidMetricsConfigurationError, - RequiredConfigurationKeyMissingError, + InvalidConfigurationError, InvalidConfigurationErrors, - InvalidProcessorConfigurationError, InvalidInputConnectorConfigurationError, InvalidOutputConnectorConfigurationError, + InvalidProcessorConfigurationError, + RequiredConfigurationKeyMissingError, ) from logprep.util.getter import GetterFactory from logprep.util.json_handling import dump_config_as_file @@ -139,16 +139,7 @@ def test_verify_verifies_output_config(self): "cumulative": True, "aggregate_processes": True, "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [ - {"prometheus": {"port": 8000}}, - { - "file": { - "path": "./logs/status.json", - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], + "port": 8000, } }, None, @@ -161,79 +152,7 @@ def test_verify_verifies_output_config(self): "cumulative": True, "aggregate_processes": True, "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [ - {"prometheus": {"port": 8000}}, - { - "file": { - "path": "./logs/status.json", - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], - } - }, - RequiredConfigurationKeyMissingError, - ), - ( - "empty target", - { - "metrics": { - "period": 10, - "enabled": True, - "cumulative": True, - "aggregate_processes": True, - "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [], - } - }, - IncalidMetricsConfigurationError, - ), - ( - "unkown target", - { - "metrics": { - "period": 10, - "enabled": True, - "cumulative": True, - "aggregate_processes": True, - "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [{"webserver": {"does-not": "exist"}}], - } - }, - IncalidMetricsConfigurationError, - ), - ( - "missing key in prometheus target config", - { - "metrics": { - "period": 10, - "enabled": True, - "cumulative": True, - "aggregate_processes": True, - "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [{"prometheus": {"wrong": "key"}}], - } - }, - RequiredConfigurationKeyMissingError, - ), - ( - "missing key in file target config", - { - "metrics": { - "period": 10, - "enabled": True, - "cumulative": True, - "aggregate_processes": True, - "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [ - { - "file": { - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], + "port": 8000, } }, RequiredConfigurationKeyMissingError, @@ -247,10 +166,7 @@ def test_verify_verifies_output_config(self): "cumulative": True, "aggregate_processes": True, "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [ - {"prometheus": {"port": 8000}}, - {"file": {}}, - ], + "port": 8000, } }, RequiredConfigurationKeyMissingError, @@ -264,16 +180,7 @@ def test_verify_verifies_output_config(self): "cumulative": True, "aggregate_processes": True, "measure_time": {"append_to_event": False}, - "targets": [ - {"prometheus": {"port": 8000}}, - { - "file": { - "path": "./logs/status.json", - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], + "port": 8000, } }, RequiredConfigurationKeyMissingError, @@ -611,16 +518,7 @@ def test_verify_error(self, config_dict, raised_errors, test_case): "cumulative": True, "aggregate_processes": True, "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [ - {"prometheus": {"port": 8000}}, - { - "file": { - "path": "./logs/status.json", - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], + "port": 8000, } }, [], @@ -634,16 +532,7 @@ def test_verify_error(self, config_dict, raised_errors, test_case): "cumulative": True, "aggregate_processes": True, "measure_time": {"append_to_event": False}, - "targets": [ - {"prometheus": {"port": 8000}}, - { - "file": { - "path": "./logs/status.json", - "rollover_interval": 86400, - "backup_count": 10, - } - }, - ], + "port": 8000, } }, [ From bc2d1fc88b6b1d6fbbc99463483cc8bb6d5de504 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 07:35:05 +0000 Subject: [PATCH 5/8] remove last metric target fragments --- .../user_manual/configuration/metrics.rst | 29 ++----------------- logprep/framework/pipeline.py | 2 +- logprep/metrics/metric.py | 6 +--- logprep/runner.py | 1 - logprep/util/prometheus_exporter.py | 2 +- tests/unit/framework/test_pipeline.py | 2 -- tests/unit/framework/test_pipeline_manager.py | 6 +--- tests/unit/metrics/test_metric_exposer.py | 12 +++----- tests/unit/util/test_prometheus_exporter.py | 4 ++- 9 files changed, 14 insertions(+), 50 deletions(-) diff --git a/doc/source/user_manual/configuration/metrics.rst b/doc/source/user_manual/configuration/metrics.rst index fcc80a8b8..1781db59c 100644 --- a/doc/source/user_manual/configuration/metrics.rst +++ b/doc/source/user_manual/configuration/metrics.rst @@ -90,27 +90,10 @@ Time Measurement is deactivated by default. If only the general metrics are activated then the metric for the time measurement will be 0. -targets +port ^^^^^^^ -List of targets where the statistics should be exported to. At the moment only :code:`file` and -:code:`prometheus` are allowed. Those can be further configured with the following options: - -**file** - -| **path** *(String)* -| Path to the log file. - -| **rollover_interval** *(Integer, value > 0)* -| Defines after how many seconds the log file should be rotated. - -| **backup_count** *(Integer, value > 0)* -| Defines how many rotating log files should exist simultaneously. - -**prometheus** - -| **port** *(Integer, value > 0)* -| Port which should be used to start the default prometheus exporter webservers +Port which should be used to start the default prometheus exporter webservers. (default: 8000) Example ------- @@ -126,13 +109,7 @@ Example measure_time: enabled: true append_to_event: false - targets: - - prometheus: - port: 8000 - - file: - path: ./logs/status.json - rollover_interval: 86400 - backup_count: 10 + port: 8000 Metrics Overview diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 807270a62..146a5abad 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -39,7 +39,7 @@ ) from logprep.abc.processor import Processor from logprep.factory import Factory -from logprep.metrics.metric import Metric, MetricTargets, calculate_new_average +from logprep.metrics.metric import Metric, calculate_new_average from logprep.metrics.metric_exposer import MetricExposer from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler diff --git a/logprep/metrics/metric.py b/logprep/metrics/metric.py index 038e4faaa..76bd07937 100644 --- a/logprep/metrics/metric.py +++ b/logprep/metrics/metric.py @@ -1,9 +1,5 @@ """This module tracks, calculates, exposes and resets logprep metrics""" -from collections import namedtuple - -from attr import define, asdict - -MetricTargets = namedtuple("MetricTargets", "file_target prometheus_target") +from attr import asdict, define def is_public(attribute, _): diff --git a/logprep/runner.py b/logprep/runner.py index f6df93cf6..b1064b22a 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -9,7 +9,6 @@ from schedule import Scheduler from logprep.framework.pipeline_manager import PipelineManager -from logprep.metrics.metric import MetricTargets from logprep.util.configuration import Configuration, InvalidConfigurationError from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler diff --git a/logprep/util/prometheus_exporter.py b/logprep/util/prometheus_exporter.py index a2f7588f7..4f045a935 100644 --- a/logprep/util/prometheus_exporter.py +++ b/logprep/util/prometheus_exporter.py @@ -17,7 +17,7 @@ class PrometheusStatsExporter: def __init__(self, status_logger_config, application_logger): self._logger = application_logger self.configuration = status_logger_config - self._port = status_logger_config.get("metrics", {}).get("port", 8000) + self._port = status_logger_config.get("port", 8000) self._prepare_multiprocessing() self._set_up_metrics() diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 54b76f8b6..0ce7fa04a 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -36,7 +36,6 @@ Pipeline, SharedCounter, ) -from logprep.metrics.metric import MetricTargets from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning from logprep.processor.deleter.rule import DeleterRule from logprep.util.getter import GetterFactory @@ -58,7 +57,6 @@ class ConfigurationForTests: log_handler = MultiprocessingLogHandler(WARNING) lock = Lock() shared_dict = {} - metric_targets = MetricTargets(file_target=getLogger("Mock"), prometheus_target=None) counter = SharedCounter() diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 2828bf3c5..11e209340 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -12,7 +12,6 @@ MustSetConfigurationFirstError, PipelineManager, ) -from logprep.metrics.metric import MetricTargets from logprep.util.configuration import Configuration from tests.testdata.metadata import path_to_config from tests.util.testhelpers import ( @@ -62,7 +61,6 @@ def setup_class(self): self.config = Configuration.create_from_yaml(path_to_config) self.handler = HandlerStub() self.logger = Logger("test") - self.metric_targets = MetricTargets(file_target=self.logger, prometheus_target=None) self.logger.addHandler(self.handler) self.manager = PipelineManagerForTesting(self.logger) @@ -229,15 +227,13 @@ def test_restart_failed_pipelines_removes_metrics_database_if_prometheus_target_ prometheus_exporter_mock.remove_metrics_from_process.assert_called() prometheus_exporter_mock.remove_metrics_from_process.assert_called_with(42) - def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_no_metric_target_is_configured( + def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_prometheus_is_not_enabled( self, ): failed_pipeline = mock.MagicMock() - failed_pipeline.metric_targets = None failed_pipeline.is_alive = mock.MagicMock() # nosemgrep failed_pipeline.is_alive.return_value = False # nosemgrep manager = PipelineManager(self.logger) manager._pipelines = [failed_pipeline] manager._configuration = {"process_count": 2} manager.restart_failed_pipeline() - assert failed_pipeline.metric_targets is None diff --git a/tests/unit/metrics/test_metric_exposer.py b/tests/unit/metrics/test_metric_exposer.py index 6f123e358..8e1e68092 100644 --- a/tests/unit/metrics/test_metric_exposer.py +++ b/tests/unit/metrics/test_metric_exposer.py @@ -11,7 +11,6 @@ import pytest from logprep.framework.rule_tree.rule_tree import RuleTree -from logprep.metrics.metric import MetricTargets from logprep.metrics.metric_exposer import MetricExposer from logprep.processor.base.rule import Rule from logprep.util.prometheus_exporter import PrometheusStatsExporter @@ -33,12 +32,9 @@ def setup_method(self): self.shared_dict[idx] = None self.logger = logging.getLogger("test-file-metric-logger") - self.metric_targets = MetricTargets( - file_target=self.logger, - prometheus_target=PrometheusStatsExporter(self.config, self.logger), - ) + self.prometheus_exporter = PrometheusStatsExporter(self.config, self.logger) self.exposer = MetricExposer( - self.config, self.metric_targets, self.shared_dict, Lock(), self.logger + self.config, self.prometheus_exporter, self.shared_dict, Lock(), self.logger ) def test_time_to_expose_returns_true_after_enough_time_has_passed(self): @@ -118,7 +114,7 @@ def test_expose_calls_send_to_output_if_no_aggregation_is_configured(self, send_ config = self.config.copy() config["aggregate_processes"] = False self.exposer = MetricExposer( - config, self.metric_targets, self.shared_dict, Lock(), self.logger + config, self.prometheus_exporter, self.shared_dict, Lock(), self.logger ) self.exposer._timer = Value(c_double, time() - self.config["period"]) mock_metrics = mock.MagicMock() @@ -130,7 +126,7 @@ def test_expose_resets_statistics_if_cumulative_config_is_false(self): config = self.config.copy() config["cumulative"] = False self.exposer = MetricExposer( - config, self.metric_targets, self.shared_dict, Lock(), self.logger + config, self.prometheus_exporter, self.shared_dict, Lock(), self.logger ) self.exposer._timer = Value(c_double, time() - self.config["period"]) metrics = Rule.RuleMetrics(labels={"type": "generic"}) diff --git a/tests/unit/util/test_prometheus_exporter.py b/tests/unit/util/test_prometheus_exporter.py index 676ae0477..6a88b8ddc 100644 --- a/tests/unit/util/test_prometheus_exporter.py +++ b/tests/unit/util/test_prometheus_exporter.py @@ -21,7 +21,9 @@ def setup_method(self): } def test_correct_setup(self): - exporter = PrometheusStatsExporter(self.metrics_config, getLogger("test-logger")) + exporter = PrometheusStatsExporter( + self.metrics_config.get("metrics"), getLogger("test-logger") + ) assert not exporter.metrics assert isinstance(exporter.tracking_interval, Gauge) assert exporter._port == self.metrics_config["metrics"]["port"] From 1e4273f899adc5808a8e1d61bf5c1a417cdae6d1 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 07:37:51 +0000 Subject: [PATCH 6/8] update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5d55dc2e..bcce818ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ ## Upcoming Changes ## next release + +### Breaking + +* removed metric file target + ### Features * add a preprocessor to enrich by systems env variables From 2ffe47dc8dd787cd67e3787b4e67e8b78ec15ba8 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 07:54:45 +0000 Subject: [PATCH 7/8] add assertion --- tests/unit/framework/test_pipeline_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 11e209340..5afe68f74 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -227,8 +227,9 @@ def test_restart_failed_pipelines_removes_metrics_database_if_prometheus_target_ prometheus_exporter_mock.remove_metrics_from_process.assert_called() prometheus_exporter_mock.remove_metrics_from_process.assert_called_with(42) + @mock.patch("logprep.util.prometheus_exporter.PrometheusStatsExporter") def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_prometheus_is_not_enabled( - self, + self, prometheus_exporter_mock ): failed_pipeline = mock.MagicMock() failed_pipeline.is_alive = mock.MagicMock() # nosemgrep @@ -237,3 +238,4 @@ def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_prometheu manager._pipelines = [failed_pipeline] manager._configuration = {"process_count": 2} manager.restart_failed_pipeline() + prometheus_exporter_mock.remove_metrics_from_process.assert_not_called() From bbb47f52c7a9320aef52ac175bd54fc7ee9ee28d Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 08:04:51 +0000 Subject: [PATCH 8/8] fix acceptance test --- tests/acceptance/test_full_configuration.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/acceptance/test_full_configuration.py b/tests/acceptance/test_full_configuration.py index fc1cca56f..af46e6a36 100644 --- a/tests/acceptance/test_full_configuration.py +++ b/tests/acceptance/test_full_configuration.py @@ -130,7 +130,7 @@ def test_logprep_exposes_prometheus_metrics(tmp_path): "cumulative": False, "aggregate_processes": False, "measure_time": {"enabled": True, "append_to_event": False}, - "targets": [{"prometheus": {"port": 8000}}], + "port": 8000, }, "input": { "fileinput": { @@ -221,7 +221,4 @@ def test_logprep_exposes_prometheus_metrics(tmp_path): assert re.search("logprep_pipeline_sum_of_processor_warnings.*pipeline-1.* 0\.0", metrics) assert re.search("logprep_pipeline_sum_of_processor_errors.*pipeline-1.* 0\.0", metrics) - assert re.search( - r"logprep_tracking_interval_in_seconds.*config_version.*logprep_version.* 1\.0", metrics - ) proc.kill()