From 14bd963130efd99fc7206fabd686d53dfc86be67 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 10:40:27 +0000 Subject: [PATCH 1/8] refactor imports --- logprep/run_logprep.py | 23 +++++++---------------- tests/unit/test_run_logprep.py | 4 ++-- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index debd22bad..7ffdad94a 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -6,7 +6,6 @@ import sys import warnings from argparse import ArgumentParser -from logging import ERROR, Logger, getLogger from os.path import basename from pathlib import Path @@ -24,21 +23,13 @@ from logprep.util.schema_and_rule_checker import SchemaAndRuleChecker from logprep.util.time_measurement import TimeMeasurement -from logging import ( - getLogger, - basicConfig, - Logger, -) -from logging.handlers import SysLogHandler - - warnings.simplefilter("always", DeprecationWarning) logging.captureWarnings(True) DEFAULT_LOCATION_CONFIG = "file:///etc/logprep/pipeline.yml" -getLogger("filelock").setLevel(ERROR) -getLogger("urllib3.connectionpool").setLevel(ERROR) -getLogger("elasticsearch").setLevel(ERROR) +logging.getLogger("filelock").setLevel(logging.ERROR) +logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) +logging.getLogger("elasticsearch").setLevel(logging.ERROR) def _parse_arguments(): @@ -98,7 +89,7 @@ def _parse_arguments(): return arguments -def _run_logprep(arguments, logger: Logger): +def _run_logprep(arguments, logger: logging.Logger): runner = None try: runner = Runner.get_runner() @@ -148,7 +139,7 @@ def _setup_logger(args, config: Configuration): try: log_config = config.get("logger", {}) log_level = log_config.get("level", "INFO") - basicConfig( + logging.basicConfig( level=log_level, format="%(asctime)-15s %(name)-5s %(levelname)-8s: %(message)s" ) logger = logging.getLogger("Logprep") @@ -156,7 +147,7 @@ def _setup_logger(args, config: Configuration): for version in get_versions_string(args).split("\n"): logger.info(version) except BaseException as error: # pylint: disable=broad-except - getLogger("Logprep").exception(error) + logging.getLogger("Logprep").exception(error) sys.exit(1) return logger @@ -187,7 +178,7 @@ def _setup_metrics_and_time_measurement(args, config, logger): logger.debug(f"Config path: {args.config}") -def _validate_rules(args, config: Configuration, logger: Logger): +def _validate_rules(args, config: Configuration, logger: logging.Logger): try: config.verify_pipeline_only(logger) except InvalidConfigurationError as error: diff --git a/tests/unit/test_run_logprep.py b/tests/unit/test_run_logprep.py index f53ed6c60..c4c5200f9 100644 --- a/tests/unit/test_run_logprep.py +++ b/tests/unit/test_run_logprep.py @@ -5,8 +5,8 @@ from unittest import mock import pytest -import responses import requests +import responses from yaml import safe_load from logprep import run_logprep @@ -233,7 +233,7 @@ def test_main_calls_runner_stop_on_any_exception(self, mock_stop, mock_start): mock_stop.assert_called() def test_logprep_exits_if_logger_can_not_be_created(self): - with mock.patch("logging.getLogger") as mock_create: + with mock.patch("logprep.run_logprep.Configuration.get") as mock_create: mock_create.side_effect = BaseException config_path = "quickstart/exampledata/config/pipeline.yml" with mock.patch("sys.argv", ["logprep", config_path]): From 7066488c2adce257f70db7df2beff703242e7bfb Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 11:21:42 +0000 Subject: [PATCH 2/8] removed ProcessStrategy as it is implementing additional complexity without multiple Strategies --- logprep/abc/processor.py | 36 +++- logprep/processor/processor_strategy.py | 91 --------- tests/performance/test_grok.py | 79 ++++++++ tests/unit/processor/base.py | 15 -- .../unit/processor/test_processor_strategy.py | 188 ------------------ tests/unit/test_run_logprep.py | 6 +- 6 files changed, 110 insertions(+), 305 deletions(-) delete mode 100644 logprep/processor/processor_strategy.py create mode 100644 tests/performance/test_grok.py delete mode 100644 tests/unit/processor/test_processor_strategy.py diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index b0d901759..9c78730a9 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -1,6 +1,8 @@ """Abstract module for processors""" import copy +import time from abc import abstractmethod +from functools import reduce from logging import DEBUG, Logger from multiprocessing import current_process from pathlib import Path @@ -16,7 +18,6 @@ ProcessingCriticalError, ProcessingWarning, ) -from logprep.processor.processor_strategy import SpecificGenericProcessStrategy from logprep.util import getter from logprep.util.helper import ( add_and_overwrite, @@ -120,7 +121,6 @@ def update_mean_processing_time_per_event(self, new_sample): def __init__(self, name: str, configuration: "Processor.Config", logger: Logger): super().__init__(name, configuration, logger) - self._strategy = SpecificGenericProcessStrategy(self._config.apply_multiple_times) self.metric_labels, specific_tree_labels, generic_tree_labels = self._create_metric_labels() self._specific_tree = RuleTree( config_path=self._config.tree_config, metric_labels=specific_tree_labels @@ -192,13 +192,31 @@ def process(self, event: dict): """ if self._logger.isEnabledFor(DEBUG): # pragma: no cover self._logger.debug(f"{self.describe()} processing event {event}") - self._strategy.process( - event, - generic_tree=self._generic_tree, - specific_tree=self._specific_tree, - callback=self._apply_rules_wrapper, - processor_metrics=self.metrics, - ) + + self.metrics.number_of_processed_events += 1 + self._process_rule_tree(event, self._specific_tree) + self._process_rule_tree(event, self._generic_tree) + + def _process_rule_tree(self, event: dict, tree: "RuleTree"): + applied_rules = set() + + def _process_rule(event, rule): + begin = time.time() + self._apply_rules_wrapper(event, rule) + processing_time = time.time() - begin + rule.metrics._number_of_matches += 1 + rule.metrics.update_mean_processing_time(processing_time) + self.metrics.update_mean_processing_time_per_event(processing_time) + applied_rules.add(rule) + return event + + if self._config.apply_multiple_times: + matching_rules = tree.get_matching_rules(event) + while matching_rules: + reduce(_process_rule, (event, *matching_rules)) + matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) + else: + reduce(_process_rule, (event, *tree.get_matching_rules(event))) def _apply_rules_wrapper(self, event, rule): try: diff --git a/logprep/processor/processor_strategy.py b/logprep/processor/processor_strategy.py deleted file mode 100644 index 2fbfa0af0..000000000 --- a/logprep/processor/processor_strategy.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -processor strategies module - -processor strategies are used to implement in one point how rules are processed in processors -this could be the order of specific or generic rules -""" -from abc import ABC, abstractmethod -from functools import reduce -from time import time -from typing import TYPE_CHECKING, Callable - -if TYPE_CHECKING: # pragma: no cover - from logprep.abc.processor import Processor - from logprep.framework.rule_tree.rule_tree import RuleTree - - -class ProcessStrategy(ABC): - """ - abstract class for strategies - """ - - @abstractmethod - def process(self, event: dict, **kwargs): - """abstract method for processing rules""" - ... # pragma: no cover - - -class SpecificGenericProcessStrategy(ProcessStrategy): - """ - Strategy to process rules in rule trees in the following order: - specific_rules >> generic_rules - """ - - def __init__(self, apply_multiple_times=False): - self._apply_multiple_times = apply_multiple_times - - def process(self, event: dict, **kwargs): - specific_tree = kwargs.get("specific_tree") - generic_tree = kwargs.get("generic_tree") - callback = kwargs.get("callback") - processor_metrics = kwargs.get("processor_metrics") - processor_metrics.number_of_processed_events += 1 - self._process_specific(event, specific_tree, callback, processor_metrics) - self._process_generic(event, generic_tree, callback, processor_metrics) - - def _process_specific( - self, - event: dict, - specific_tree: "RuleTree", - callback: Callable, - processor_metrics: "Processor.ProcessorMetrics", - ): - """method for processing specific rules""" - self._process_rule_tree(event, specific_tree, callback, processor_metrics) - - def _process_generic( - self, - event: dict, - generic_tree: "RuleTree", - callback: Callable, - processor_metrics: "Processor.ProcessorMetrics", - ): - """method for processing generic rules""" - self._process_rule_tree(event, generic_tree, callback, processor_metrics) - - def _process_rule_tree( - self, - event: dict, - tree: "RuleTree", - callback: Callable, - processor_metrics: "Processor.ProcessorMetrics", - ): - applied_rules = set() - - def _process_rule(event, rule): - begin = time() - callback(event, rule) - processing_time = time() - begin - rule.metrics._number_of_matches += 1 - rule.metrics.update_mean_processing_time(processing_time) - processor_metrics.update_mean_processing_time_per_event(processing_time) - applied_rules.add(rule) - return event - - if self._apply_multiple_times: - matching_rules = tree.get_matching_rules(event) - while matching_rules: - reduce(_process_rule, (event, *matching_rules)) - matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) - else: - reduce(_process_rule, (event, *tree.get_matching_rules(event))) diff --git a/tests/performance/test_grok.py b/tests/performance/test_grok.py new file mode 100644 index 000000000..eadb499e3 --- /dev/null +++ b/tests/performance/test_grok.py @@ -0,0 +1,79 @@ +# pylint: disable=missing-docstring +# pylint: disable=protected-access +import timeit + +setup_import = """ +from unittest import mock + +from logprep.factory import Factory +from logprep.processor.grokker.processor import Grokker +from logprep.processor.grokker.rule import GrokkerRule +""" +setup_grokker = """ +rule = GrokkerRule._create_from_dict(rule) +grokker_config = { + "mygrokker": { + "type": "grokker", + "specific_rules": [], + "generic_rules": [], + } +} +mock_logger = mock.MagicMock() +grokker: Grokker = Factory.create(grokker_config, mock_logger) +grokker._specific_tree.add_rule(rule) +grokker.setup() +""" +run = """ +grokker.process(document) + """ + +simple_grok_pattern = """ +document = {"message": "2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log"} +rule = { + "filter": "message", + "grokker": { + "mapping": { + "message": "%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:logLevel} %{GREEDYDATA:logMessage}" + } + }, +} +""" + +linux_syslogline_pattern = """ +document = {"message": 'Oct 7 09:21:35 dev-machine c6182927c772[1115]: logger=infra.usagestats t=2023-10-07T09:21:35.676177822Z level=info msg="Usage stats are ready to report"'} +rule = { + "filter": "message", + "grokker": { + "mapping": { + "message": "%{SYSLOGLINE}" + } + }, +} +""" + +linux_syslogline_5424_pattern = """ +document = {"message": 'Oct 7 09:21:35 dev-machine c6182927c772[1115]: logger=infra.usagestats t=2023-10-07T09:21:35.676177822Z level=info msg="Usage stats are ready to report"'} +rule = { + "filter": "message", + "grokker": { + "mapping": { + "message": "%{SYSLOG5424LINE}" + } + }, +} +""" + + +def main(): + testcases = [ + simple_grok_pattern, + linux_syslogline_pattern, + linux_syslogline_5424_pattern, + ] + + for case in testcases: + print(timeit.timeit(run, number=100000, setup=setup_import + case + setup_grokker)) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 8a33750a0..dc491aede 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -16,7 +16,6 @@ from logprep.factory import Factory from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.processor.base.exceptions import ProcessingWarning -from logprep.processor.processor_strategy import ProcessStrategy from logprep.util.helper import camel_to_snake from logprep.util.json_handling import list_json_files_in_directory from logprep.util.time_measurement import TimeMeasurement @@ -228,20 +227,6 @@ def test_rules_returns_all_specific_and_generic_rules(self): object_rules_count = len(self.object.rules) assert all_rules_count == object_rules_count - def test_process_strategy_returns_strategy_object(self): - assert isinstance(self.object._strategy, ProcessStrategy) - - def test_process_calls_strategy(self): - """ - This test method needs to be overwritten in your ProcessorTests - if your processor uses another strategy - """ - with mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy.process" - ) as mock_strategy_process: - self.object.process({}) - mock_strategy_process.assert_called() - def test_process_is_measured(self): TimeMeasurement.TIME_MEASUREMENT_ENABLED = True TimeMeasurement.APPEND_TO_EVENT = True diff --git a/tests/unit/processor/test_processor_strategy.py b/tests/unit/processor/test_processor_strategy.py deleted file mode 100644 index c63622080..000000000 --- a/tests/unit/processor/test_processor_strategy.py +++ /dev/null @@ -1,188 +0,0 @@ -# pylint: disable=missing-docstring -# pylint: disable=protected-access -import re -from logging import getLogger -from unittest import mock -from unittest.mock import call - -import pytest - -from logprep.abc.processor import Processor -from logprep.factory import Factory -from logprep.framework.pipeline import Pipeline -from logprep.processor.dissector.rule import DissectorRule -from logprep.processor.generic_adder.rule import GenericAdderRule -from logprep.processor.processor_strategy import SpecificGenericProcessStrategy - - -class TestSpecificGenericProcessStrategy: - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_generic" - ) - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_specific" - ) - def test_process(self, mock_process_specific, mock_process_generic): - mock_metrics = Processor.ProcessorMetrics( - labels={}, specific_rule_tree=[], generic_rule_tree=[] - ) - strategy = SpecificGenericProcessStrategy() - strategy.process({}, processor_stats=mock.Mock(), processor_metrics=mock_metrics) - mock_process_generic.assert_called() - mock_process_specific.assert_called() - - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_generic" - ) - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_specific" - ) - def test_process_specific_before_generic(self, mock_process_specific, mock_process_generic): - call_order = [] - mock_process_specific.side_effect = lambda *a, **kw: call_order.append( - mock_process_specific - ) - mock_process_generic.side_effect = lambda *a, **kw: call_order.append(mock_process_generic) - mock_metrics = Processor.ProcessorMetrics( - labels={}, specific_rule_tree=[], generic_rule_tree=[] - ) - strategy = SpecificGenericProcessStrategy() - strategy.process({}, processor_stats=mock.Mock(), processor_metrics=mock_metrics) - assert call_order == [mock_process_specific, mock_process_generic] - - def test_apply_processor_multiple_times_until_no_new_rule_matches(self): - config = { - "type": "dissector", - "specific_rules": [], - "generic_rules": [], - "apply_multiple_times": True, - } - processor = Factory.create({"custom_lister": config}, getLogger("test-logger")) - rule_one_dict = { - "filter": "message", - "dissector": {"mapping": {"message": "%{time} [%{protocol}] %{url}"}}, - } - rule_two_dict = { - "filter": "protocol", - "dissector": {"mapping": {"protocol": "%{proto} %{col}"}}, - } - rule_one = DissectorRule._create_from_dict(rule_one_dict) - rule_two = DissectorRule._create_from_dict(rule_two_dict) - processor._specific_tree.add_rule(rule_one) - processor._specific_tree.add_rule(rule_two) - event = {"message": "time [proto col] url"} - expected_event = { - "message": "time [proto col] url", - "proto": "proto", - "col": "col", - "protocol": "proto col", - "time": "time", - "url": "url", - } - processor._strategy.process( - event, - generic_tree=processor._generic_tree, - specific_tree=processor._specific_tree, - callback=processor._apply_rules_wrapper, - processor_stats=mock.Mock(), - processor_metrics=mock.MagicMock(), - ) - assert expected_event == event - - def test_apply_processor_multiple_times_not_enabled(self): - config = {"type": "dissector", "specific_rules": [], "generic_rules": []} - processor = Factory.create({"custom_lister": config}, getLogger("test-logger")) - rule_one_dict = { - "filter": "message", - "dissector": {"mapping": {"message": "%{time} [%{protocol}] %{url}"}}, - } - rule_two_dict = { - "filter": "protocol", - "dissector": {"mapping": {"protocol": "%{proto} %{col}"}}, - } - rule_one = DissectorRule._create_from_dict(rule_one_dict) - rule_two = DissectorRule._create_from_dict(rule_two_dict) - processor._specific_tree.add_rule(rule_one) - processor._specific_tree.add_rule(rule_two) - event = {"message": "time [proto col] url"} - expected_event = { - "message": "time [proto col] url", - "protocol": "proto col", - "time": "time", - "url": "url", - } - processor._strategy.process( - event, - generic_tree=processor._generic_tree, - specific_tree=processor._specific_tree, - callback=processor._apply_rules_wrapper, - processor_stats=mock.Mock(), - processor_metrics=mock.MagicMock(), - ) - assert expected_event == event - - @pytest.mark.parametrize("execution_number", range(5)) # repeat test to ensure determinism - def test_strategy_applies_rules_in_deterministic_order(self, execution_number): - config = {"type": "generic_adder", "specific_rules": [], "generic_rules": []} - processor = Factory.create({"custom_lister": config}, getLogger("test-logger")) - rule_one_dict = {"filter": "val", "generic_adder": {"add": {"some": "value"}}} - rule_two_dict = {"filter": "NOT something", "generic_adder": {"add": {"something": "else"}}} - rule_one = GenericAdderRule._create_from_dict(rule_one_dict) - rule_two = GenericAdderRule._create_from_dict(rule_two_dict) - processor._specific_tree.add_rule(rule_one) - processor._specific_tree.add_rule(rule_two) - event = {"val": "content"} - mock_callback = mock.MagicMock() - processor._strategy.process( - event=event, - generic_tree=processor._generic_tree, - specific_tree=processor._specific_tree, - callback=mock_callback, - processor_stats=mock.Mock(), - processor_metrics=mock.MagicMock(), - ) - expected_call_order = [call(event, rule_one), call(event, rule_two)] - assert ( - mock_callback.mock_calls == expected_call_order - ), f"Wrong call order in test {execution_number}" - - def test_strategy_processes_generic_rules_after_processor_error_in_specific_rules(self, capsys): - config = { - "pipeline": [ - {"adder": {"type": "generic_adder", "specific_rules": [], "generic_rules": []}} - ] - } - specific_rule_one_dict = { - "filter": "val", - "generic_adder": {"add": {"first": "value", "second": "value"}}, - } - specific_rule_two_dict = { - "filter": "val", - "generic_adder": {"add": {"third": "value", "fourth": "value"}}, - } - generic_rule_dict = { - "filter": "val", - "generic_adder": {"add": {"fifth": "value", "sixth": "value"}}, - } - specific_rule_one = GenericAdderRule._create_from_dict(specific_rule_one_dict) - specific_rule_two = GenericAdderRule._create_from_dict(specific_rule_two_dict) - generic_rule = GenericAdderRule._create_from_dict(generic_rule_dict) - event = {"val": "content", "first": "exists already"} - expected_event = { - "val": "content", - "first": "exists already", - "second": "value", - "third": "value", - "fourth": "value", - "fifth": "value", - "sixth": "value", - "tags": ["_generic_adder_failure"], - } - pipeline = Pipeline(config=config) - pipeline._pipeline[0]._generic_tree.add_rule(generic_rule) - pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_two) - pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_one) - pipeline.process_event(event) - captured = capsys.readouterr() - assert re.match("FieldExistsWarning in GenericAdder.*first", captured.err) - assert event == expected_event diff --git a/tests/unit/test_run_logprep.py b/tests/unit/test_run_logprep.py index c4c5200f9..20b59d9a7 100644 --- a/tests/unit/test_run_logprep.py +++ b/tests/unit/test_run_logprep.py @@ -155,7 +155,8 @@ def test_version_arg_prints_with_http_config(self, capsys): expected_lines = ( f"python version: {sys.version.split()[0]}\n" f"logprep version: {get_versions()['version']}\n" - f"configuration version: {configuration['version']}, http://localhost:32000/{config_path}" + f"configuration version: {configuration['version']}," + f" http://localhost:32000/{config_path}" ) assert lines == expected_lines @@ -184,7 +185,8 @@ def test_version_arg_prints_with_http_config_without_exposing_secret_data(self, expected_lines = ( f"python version: {sys.version.split()[0]}\n" f"logprep version: {get_versions()['version']}\n" - f"configuration version: {configuration['version']}, http://localhost:32000/{config_path}" + f"configuration version: {configuration['version']}," + f" http://localhost:32000/{config_path}" ) assert lines == expected_lines From 9fe76ea58f8c2bf8d3c398f46bf88613914ac2e3 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 11:22:25 +0000 Subject: [PATCH 3/8] remove mermaid from documentation because it does noct work in airgapped environments --- doc/requirements.txt | 1 - doc/source/conf.py | 1 - doc/source/development/index.rst | 116 ------------------------ doc/source/user_manual/introduction.rst | 21 ----- 4 files changed, 139 deletions(-) diff --git a/doc/requirements.txt b/doc/requirements.txt index 2628f3ac3..23c08efbf 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -1,6 +1,5 @@ sphinx sphinx_rtd_theme -sphinxcontrib-mermaid sphinxcontrib.datatemplates sphinx-copybutton nbsphinx diff --git a/doc/source/conf.py b/doc/source/conf.py index 798abf806..004902e94 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -56,7 +56,6 @@ def setup(app): # ones. extensions = [ "sphinx.ext.napoleon", - "sphinxcontrib.mermaid", "sphinx.ext.autosummary", "sphinxcontrib.datatemplates", "nbsphinx", diff --git a/doc/source/development/index.rst b/doc/source/development/index.rst index b532a3640..f54b5dfa2 100644 --- a/doc/source/development/index.rst +++ b/doc/source/development/index.rst @@ -2,122 +2,6 @@ Development =========== -.. mermaid:: - - classDiagram - Component <-- Processor - Component <-- Connector - Connector <-- Input : implements - Connector <-- Output : implements - Processor <-- Normalizer : implements - Processor <-- Pseudonymizer : implements - Input <-- ConfluentKafkaInput : implements - Output <-- ConfluentKafkaOutput : implements - ProcessorConfiguration - Rule <-- NormalizerRule : inherit - Rule <-- PseudonymizerRule : inherit - BaseProcessorTestCase <-- NormalizerTestCase : implements - BaseProcessorTestCase <-- PseudonymizerTestCase : implements - class Component{ - +Config - +str name - +Logger _logger - +Config _config - +String describe() - +None setup() - +None shut_down() - - } - class Processor{ - <> - +rule_class - +Config - +load_rules() - +process() - +apply_rules()* - } - class Normalizer{ - +Config - +rule_class = NormalizerRule - +_config: Normalizer.Config - +apply_rules() - } - - class Pseudonymizer{ - +Config - +rule_class = PseudonymizerRule - +_config: Pseudonymizer.Config - +apply_rules() - } - class Connector{ - <> - +Config - } - class Input{ - <> - +Config - +_config: Input.Config - -Dict _get_event()* - -None _get_raw_event() - +tuple[dict, error|None] get_next() - } - class Output{ - <> - +Config - +_config: Output.Config - +None store()* - +None store_custom()* - +None store_failed()* - } - class ConfluentKafkaInput{ - +Config - +_config: ConfluentKafkaInput.Config - +tuple _get_event() - +bytearray _get_raw_event() - } - class ConfluentKafkaOutput{ - +Config - +_config: ConfluentKafkaInput.Config - +None store() - +None store_custom() - +None store_failed() - } - - class Configuration{ - <> - +create - } - class Registry{ - +mapping : dict - } - - class Factory{ - +create() - } - - - class TestFactory{ - +test_check() - +test_create_normalizer() - +test_create_pseudonymizer() - } - - class BaseProcessorTestCase{ - +test_describe() - +test_load_rules() - +test_process() - +test_apply_rules()* - } - - class NormalizerTestCase{ - +test_apply_rules() - } - - class PseudonymizerTestCase{ - +test_apply_rules() - } - - .. toctree:: :maxdepth: 2 diff --git a/doc/source/user_manual/introduction.rst b/doc/source/user_manual/introduction.rst index 7851201da..3d7a5e446 100644 --- a/doc/source/user_manual/introduction.rst +++ b/doc/source/user_manual/introduction.rst @@ -36,27 +36,6 @@ Multiple instances of pipelines are created and run in parallel by different pro Only one event at a time is processed by each processor. Therefore, results of a processor should not depend on other events. -.. mermaid:: - - flowchart LR - A[Input\nConnector] --> B - A[Input\nConnector] --> C - A[Input\nConnector] --> D - subgraph Pipeline 1 - B[Normalizer] --> E[Geo-IP Enricher] - E --> F[Dropper] - end - subgraph Pipeline 2 - C[Normalizer] --> G[Geo-IP Enricher] - G --> H[Dropper] - end - subgraph Pipeline n - D[Normalizer] --> I[Geo-IP Enricher] - I --> J[Dropper] - end - F --> K[Output\nConnector] - H --> K[Output\nConnector] - J --> K[Output\nConnector] Processors ========== From 43d629af1a0afd34193d496703c1832e9348092b Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 13:44:54 +0000 Subject: [PATCH 4/8] readd tests for specific and generic behavior --- logprep/abc/processor.py | 4 +- tests/unit/processor/test_process.py | 168 +++++++++++++++++++++++++++ 2 files changed, 169 insertions(+), 3 deletions(-) create mode 100644 tests/unit/processor/test_process.py diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 9c78730a9..edd73e0b6 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -190,9 +190,7 @@ def process(self, event: dict): A dictionary representing a log event. """ - if self._logger.isEnabledFor(DEBUG): # pragma: no cover - self._logger.debug(f"{self.describe()} processing event {event}") - + self._logger.debug(f"{self.describe()} processing event {event}") self.metrics.number_of_processed_events += 1 self._process_rule_tree(event, self._specific_tree) self._process_rule_tree(event, self._generic_tree) diff --git a/tests/unit/processor/test_process.py b/tests/unit/processor/test_process.py new file mode 100644 index 000000000..59dd41546 --- /dev/null +++ b/tests/unit/processor/test_process.py @@ -0,0 +1,168 @@ +# pylint: disable=missing-docstring +# pylint: disable=protected-access +import re +from logging import getLogger +from unittest import mock +from unittest.mock import call + +import pytest + +from logprep.abc.processor import Processor +from logprep.factory import Factory +from logprep.framework.pipeline import Pipeline +from logprep.processor.dissector.rule import DissectorRule +from logprep.processor.generic_adder.rule import GenericAdderRule + + +class TestSpecificGenericProcessStrategy: + @mock.patch("logprep.abc.processor.Processor._process_rule_tree") + def test_process(self, mock_process_rule_tree): + processor = Factory.create( + { + "dummy": { + "type": "calculator", + "generic_rules": [], + "specific_rules": [], + } + }, + mock.MagicMock(), + ) + processor.process({}) + mock_process_rule_tree.assert_called() + assert mock_process_rule_tree.call_count == 2 + + @mock.patch("logprep.abc.processor.Processor._process_rule_tree") + def test_process_specific_before_generic(self, mock_process_rule_tree): + processor = Factory.create( + { + "dummy": { + "type": "calculator", + "generic_rules": [], + "specific_rules": [], + } + }, + mock.MagicMock(), + ) + processor.process({}) + assert mock_process_rule_tree.call_count == 2 + mock_calls = [ + call({}, processor._specific_tree), + call({}, processor._generic_tree), + ] + mock_process_rule_tree.assert_has_calls(mock_calls, any_order=False) + + def test_apply_processor_multiple_times_until_no_new_rule_matches(self): + config = { + "type": "dissector", + "specific_rules": [], + "generic_rules": [], + "apply_multiple_times": True, + } + processor = Factory.create({"custom_lister": config}, getLogger("test-logger")) + rule_one_dict = { + "filter": "message", + "dissector": {"mapping": {"message": "%{time} [%{protocol}] %{url}"}}, + } + rule_two_dict = { + "filter": "protocol", + "dissector": {"mapping": {"protocol": "%{proto} %{col}"}}, + } + rule_one = DissectorRule._create_from_dict(rule_one_dict) + rule_two = DissectorRule._create_from_dict(rule_two_dict) + processor._specific_tree.add_rule(rule_one) + processor._specific_tree.add_rule(rule_two) + event = {"message": "time [proto col] url"} + expected_event = { + "message": "time [proto col] url", + "proto": "proto", + "col": "col", + "protocol": "proto col", + "time": "time", + "url": "url", + } + processor.process(event) + assert expected_event == event + + def test_apply_processor_multiple_times_not_enabled(self): + config = {"type": "dissector", "specific_rules": [], "generic_rules": []} + processor = Factory.create({"custom_lister": config}, getLogger("test-logger")) + rule_one_dict = { + "filter": "message", + "dissector": {"mapping": {"message": "%{time} [%{protocol}] %{url}"}}, + } + rule_two_dict = { + "filter": "protocol", + "dissector": {"mapping": {"protocol": "%{proto} %{col}"}}, + } + rule_one = DissectorRule._create_from_dict(rule_one_dict) + rule_two = DissectorRule._create_from_dict(rule_two_dict) + processor._specific_tree.add_rule(rule_one) + processor._specific_tree.add_rule(rule_two) + event = {"message": "time [proto col] url"} + expected_event = { + "message": "time [proto col] url", + "protocol": "proto col", + "time": "time", + "url": "url", + } + processor.process(event) + assert expected_event == event + + @pytest.mark.parametrize("execution_number", range(5)) # repeat test to ensure determinism + def test_strategy_applies_rules_in_deterministic_order(self, execution_number): + config = {"type": "generic_adder", "specific_rules": [], "generic_rules": []} + processor = Factory.create({"custom_lister": config}, getLogger("test-logger")) + rule_one_dict = {"filter": "val", "generic_adder": {"add": {"some": "value"}}} + rule_two_dict = {"filter": "NOT something", "generic_adder": {"add": {"something": "else"}}} + rule_one = GenericAdderRule._create_from_dict(rule_one_dict) + rule_two = GenericAdderRule._create_from_dict(rule_two_dict) + processor._specific_tree.add_rule(rule_one) + processor._specific_tree.add_rule(rule_two) + event = {"val": "content"} + with mock.patch("logprep.abc.processor.Processor._apply_rules_wrapper") as mock_callback: + expected_call_order = [call(event, rule_one), call(event, rule_two)] + processor.process(event=event) + assert mock_callback.assert_has_calls( + expected_call_order, any_order=False + ), f"Wrong call order in test {execution_number}" + + def test_strategy_processes_generic_rules_after_processor_error_in_specific_rules(self, capsys): + config = { + "pipeline": [ + {"adder": {"type": "generic_adder", "specific_rules": [], "generic_rules": []}} + ] + } + specific_rule_one_dict = { + "filter": "val", + "generic_adder": {"add": {"first": "value", "second": "value"}}, + } + specific_rule_two_dict = { + "filter": "val", + "generic_adder": {"add": {"third": "value", "fourth": "value"}}, + } + generic_rule_dict = { + "filter": "val", + "generic_adder": {"add": {"fifth": "value", "sixth": "value"}}, + } + specific_rule_one = GenericAdderRule._create_from_dict(specific_rule_one_dict) + specific_rule_two = GenericAdderRule._create_from_dict(specific_rule_two_dict) + generic_rule = GenericAdderRule._create_from_dict(generic_rule_dict) + event = {"val": "content", "first": "exists already"} + expected_event = { + "val": "content", + "first": "exists already", + "second": "value", + "third": "value", + "fourth": "value", + "fifth": "value", + "sixth": "value", + "tags": ["_generic_adder_failure"], + } + pipeline = Pipeline(config=config) + pipeline._pipeline[0]._generic_tree.add_rule(generic_rule) + pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_two) + pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_one) + pipeline.process_event(event) + captured = capsys.readouterr() + assert re.match("FieldExistsWarning in GenericAdder.*first", captured.err) + assert event == expected_event From 9ba318edf0999afc37f49b1950cceec404f6a037 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 13:50:26 +0000 Subject: [PATCH 5/8] fix test --- tests/unit/processor/test_process.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/unit/processor/test_process.py b/tests/unit/processor/test_process.py index 59dd41546..6739473a3 100644 --- a/tests/unit/processor/test_process.py +++ b/tests/unit/processor/test_process.py @@ -7,7 +7,6 @@ import pytest -from logprep.abc.processor import Processor from logprep.factory import Factory from logprep.framework.pipeline import Pipeline from logprep.processor.dissector.rule import DissectorRule @@ -122,9 +121,7 @@ def test_strategy_applies_rules_in_deterministic_order(self, execution_number): with mock.patch("logprep.abc.processor.Processor._apply_rules_wrapper") as mock_callback: expected_call_order = [call(event, rule_one), call(event, rule_two)] processor.process(event=event) - assert mock_callback.assert_has_calls( - expected_call_order, any_order=False - ), f"Wrong call order in test {execution_number}" + mock_callback.assert_has_calls(expected_call_order, any_order=False) def test_strategy_processes_generic_rules_after_processor_error_in_specific_rules(self, capsys): config = { From 79a3e9665de93ed7297b6c4fc2b3dfa7ee1f4694 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 14:01:21 +0000 Subject: [PATCH 6/8] fix test --- tests/unit/processor/base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index dc491aede..e4a8e0228 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -246,12 +246,11 @@ def test_process_measurements_appended_under_processor_config_name(self): assert isinstance(processing_times[config_name], float) @mock.patch("logging.Logger.debug") - @mock.patch("logging.Logger.isEnabledFor", return_value=True) - def test_process_writes_debug_messages(self, mock_is_enabled, mock_debug): + def test_process_writes_debug_messages(self, mock_debug): event = {} self.object.process(event) - mock_is_enabled.assert_called() mock_debug.assert_called() + mock_debug.assert_called_with(f"{self.object.describe()} processing event {event}") def test_config_attribute_is_config_object(self): assert isinstance(self.object._config, self.object.Config) From c11cf4b9e5baa43c09de25cfbbde5066277030b1 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 7 Oct 2023 14:06:55 +0000 Subject: [PATCH 7/8] fix test --- tests/unit/processor/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index e4a8e0228..0a0f4c6ad 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -250,7 +250,6 @@ def test_process_writes_debug_messages(self, mock_debug): event = {} self.object.process(event) mock_debug.assert_called() - mock_debug.assert_called_with(f"{self.object.describe()} processing event {event}") def test_config_attribute_is_config_object(self): assert isinstance(self.object._config, self.object.Config) From c7c6bf451a05c59fe0a0b7cb7bbfc5c1bbc5f4bf Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sun, 8 Oct 2023 03:06:02 +0000 Subject: [PATCH 8/8] remove performance tests --- tests/performance/test_grok.py | 79 ---------------------------------- 1 file changed, 79 deletions(-) delete mode 100644 tests/performance/test_grok.py diff --git a/tests/performance/test_grok.py b/tests/performance/test_grok.py deleted file mode 100644 index eadb499e3..000000000 --- a/tests/performance/test_grok.py +++ /dev/null @@ -1,79 +0,0 @@ -# pylint: disable=missing-docstring -# pylint: disable=protected-access -import timeit - -setup_import = """ -from unittest import mock - -from logprep.factory import Factory -from logprep.processor.grokker.processor import Grokker -from logprep.processor.grokker.rule import GrokkerRule -""" -setup_grokker = """ -rule = GrokkerRule._create_from_dict(rule) -grokker_config = { - "mygrokker": { - "type": "grokker", - "specific_rules": [], - "generic_rules": [], - } -} -mock_logger = mock.MagicMock() -grokker: Grokker = Factory.create(grokker_config, mock_logger) -grokker._specific_tree.add_rule(rule) -grokker.setup() -""" -run = """ -grokker.process(document) - """ - -simple_grok_pattern = """ -document = {"message": "2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log"} -rule = { - "filter": "message", - "grokker": { - "mapping": { - "message": "%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:logLevel} %{GREEDYDATA:logMessage}" - } - }, -} -""" - -linux_syslogline_pattern = """ -document = {"message": 'Oct 7 09:21:35 dev-machine c6182927c772[1115]: logger=infra.usagestats t=2023-10-07T09:21:35.676177822Z level=info msg="Usage stats are ready to report"'} -rule = { - "filter": "message", - "grokker": { - "mapping": { - "message": "%{SYSLOGLINE}" - } - }, -} -""" - -linux_syslogline_5424_pattern = """ -document = {"message": 'Oct 7 09:21:35 dev-machine c6182927c772[1115]: logger=infra.usagestats t=2023-10-07T09:21:35.676177822Z level=info msg="Usage stats are ready to report"'} -rule = { - "filter": "message", - "grokker": { - "mapping": { - "message": "%{SYSLOG5424LINE}" - } - }, -} -""" - - -def main(): - testcases = [ - simple_grok_pattern, - linux_syslogline_pattern, - linux_syslogline_5424_pattern, - ] - - for case in testcases: - print(timeit.timeit(run, number=100000, setup=setup_import + case + setup_grokker)) - - -if __name__ == "__main__": - main()