Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove process_strategy and mermaid #451

Merged
merged 8 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
sphinx
sphinx_rtd_theme
sphinxcontrib-mermaid
sphinxcontrib.datatemplates
sphinx-copybutton
nbsphinx
Expand Down
1 change: 0 additions & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def setup(app):
# ones.
extensions = [
"sphinx.ext.napoleon",
"sphinxcontrib.mermaid",
"sphinx.ext.autosummary",
"sphinxcontrib.datatemplates",
"nbsphinx",
Expand Down
116 changes: 0 additions & 116 deletions doc/source/development/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,122 +2,6 @@
Development
===========

.. mermaid::

classDiagram
Component <-- Processor
Component <-- Connector
Connector <-- Input : implements
Connector <-- Output : implements
Processor <-- Normalizer : implements
Processor <-- Pseudonymizer : implements
Input <-- ConfluentKafkaInput : implements
Output <-- ConfluentKafkaOutput : implements
ProcessorConfiguration
Rule <-- NormalizerRule : inherit
Rule <-- PseudonymizerRule : inherit
BaseProcessorTestCase <-- NormalizerTestCase : implements
BaseProcessorTestCase <-- PseudonymizerTestCase : implements
class Component{
+Config
+str name
+Logger _logger
+Config _config
+String describe()
+None setup()
+None shut_down()

}
class Processor{
<<interface>>
+rule_class
+Config
+load_rules()
+process()
+apply_rules()*
}
class Normalizer{
+Config
+rule_class = NormalizerRule
+_config: Normalizer.Config
+apply_rules()
}

class Pseudonymizer{
+Config
+rule_class = PseudonymizerRule
+_config: Pseudonymizer.Config
+apply_rules()
}
class Connector{
<<interface>>
+Config
}
class Input{
<<interface>>
+Config
+_config: Input.Config
-Dict _get_event()*
-None _get_raw_event()
+tuple[dict, error|None] get_next()
}
class Output{
<<interface>>
+Config
+_config: Output.Config
+None store()*
+None store_custom()*
+None store_failed()*
}
class ConfluentKafkaInput{
+Config
+_config: ConfluentKafkaInput.Config
+tuple _get_event()
+bytearray _get_raw_event()
}
class ConfluentKafkaOutput{
+Config
+_config: ConfluentKafkaInput.Config
+None store()
+None store_custom()
+None store_failed()
}

class Configuration{
<<adapter>>
+create
}
class Registry{
+mapping : dict
}

class Factory{
+create()
}


class TestFactory{
+test_check()
+test_create_normalizer()
+test_create_pseudonymizer()
}

class BaseProcessorTestCase{
+test_describe()
+test_load_rules()
+test_process()
+test_apply_rules()*
}

class NormalizerTestCase{
+test_apply_rules()
}

class PseudonymizerTestCase{
+test_apply_rules()
}


.. toctree::
:maxdepth: 2

Expand Down
21 changes: 0 additions & 21 deletions doc/source/user_manual/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,6 @@ Multiple instances of pipelines are created and run in parallel by different pro
Only one event at a time is processed by each processor.
Therefore, results of a processor should not depend on other events.

.. mermaid::

flowchart LR
A[Input\nConnector] --> B
A[Input\nConnector] --> C
A[Input\nConnector] --> D
subgraph Pipeline 1
B[Normalizer] --> E[Geo-IP Enricher]
E --> F[Dropper]
end
subgraph Pipeline 2
C[Normalizer] --> G[Geo-IP Enricher]
G --> H[Dropper]
end
subgraph Pipeline n
D[Normalizer] --> I[Geo-IP Enricher]
I --> J[Dropper]
end
F --> K[Output\nConnector]
H --> K[Output\nConnector]
J --> K[Output\nConnector]

Processors
==========
Expand Down
38 changes: 27 additions & 11 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Abstract module for processors"""
import copy
import time
from abc import abstractmethod
from functools import reduce
from logging import DEBUG, Logger
from multiprocessing import current_process
from pathlib import Path
Expand All @@ -16,7 +18,6 @@
ProcessingCriticalError,
ProcessingWarning,
)
from logprep.processor.processor_strategy import SpecificGenericProcessStrategy
from logprep.util import getter
from logprep.util.helper import (
add_and_overwrite,
Expand Down Expand Up @@ -120,7 +121,6 @@ def update_mean_processing_time_per_event(self, new_sample):

def __init__(self, name: str, configuration: "Processor.Config", logger: Logger):
super().__init__(name, configuration, logger)
self._strategy = SpecificGenericProcessStrategy(self._config.apply_multiple_times)
self.metric_labels, specific_tree_labels, generic_tree_labels = self._create_metric_labels()
self._specific_tree = RuleTree(
config_path=self._config.tree_config, metric_labels=specific_tree_labels
Expand Down Expand Up @@ -190,15 +190,31 @@ def process(self, event: dict):
A dictionary representing a log event.

"""
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
self._logger.debug(f"{self.describe()} processing event {event}")
self._strategy.process(
event,
generic_tree=self._generic_tree,
specific_tree=self._specific_tree,
callback=self._apply_rules_wrapper,
processor_metrics=self.metrics,
)
self._logger.debug(f"{self.describe()} processing event {event}")
self.metrics.number_of_processed_events += 1
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
applied_rules = set()

def _process_rule(event, rule):
begin = time.time()
self._apply_rules_wrapper(event, rule)
processing_time = time.time() - begin
rule.metrics._number_of_matches += 1
rule.metrics.update_mean_processing_time(processing_time)
self.metrics.update_mean_processing_time_per_event(processing_time)
applied_rules.add(rule)
return event

if self._config.apply_multiple_times:
matching_rules = tree.get_matching_rules(event)
while matching_rules:
reduce(_process_rule, (event, *matching_rules))
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)
else:
reduce(_process_rule, (event, *tree.get_matching_rules(event)))

def _apply_rules_wrapper(self, event, rule):
try:
Expand Down
91 changes: 0 additions & 91 deletions logprep/processor/processor_strategy.py

This file was deleted.

23 changes: 7 additions & 16 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys
import warnings
from argparse import ArgumentParser
from logging import ERROR, Logger, getLogger
from os.path import basename
from pathlib import Path

Expand All @@ -24,21 +23,13 @@
from logprep.util.schema_and_rule_checker import SchemaAndRuleChecker
from logprep.util.time_measurement import TimeMeasurement

from logging import (
getLogger,
basicConfig,
Logger,
)
from logging.handlers import SysLogHandler


warnings.simplefilter("always", DeprecationWarning)
logging.captureWarnings(True)

DEFAULT_LOCATION_CONFIG = "file:///etc/logprep/pipeline.yml"
getLogger("filelock").setLevel(ERROR)
getLogger("urllib3.connectionpool").setLevel(ERROR)
getLogger("elasticsearch").setLevel(ERROR)
logging.getLogger("filelock").setLevel(logging.ERROR)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("elasticsearch").setLevel(logging.ERROR)


def _parse_arguments():
Expand Down Expand Up @@ -98,7 +89,7 @@ def _parse_arguments():
return arguments


def _run_logprep(arguments, logger: Logger):
def _run_logprep(arguments, logger: logging.Logger):
runner = None
try:
runner = Runner.get_runner()
Expand Down Expand Up @@ -148,15 +139,15 @@ def _setup_logger(args, config: Configuration):
try:
log_config = config.get("logger", {})
log_level = log_config.get("level", "INFO")
basicConfig(
logging.basicConfig(
level=log_level, format="%(asctime)-15s %(name)-5s %(levelname)-8s: %(message)s"
)
logger = logging.getLogger("Logprep")
logger.info(f"Log level set to '{log_level}'")
for version in get_versions_string(args).split("\n"):
logger.info(version)
except BaseException as error: # pylint: disable=broad-except
getLogger("Logprep").exception(error)
logging.getLogger("Logprep").exception(error)
sys.exit(1)
return logger

Expand Down Expand Up @@ -187,7 +178,7 @@ def _setup_metrics_and_time_measurement(args, config, logger):
logger.debug(f"Config path: {args.config}")


def _validate_rules(args, config: Configuration, logger: Logger):
def _validate_rules(args, config: Configuration, logger: logging.Logger):
try:
config.verify_pipeline_only(logger)
except InvalidConfigurationError as error:
Expand Down
Loading
Loading