Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add inline rule config feature to pipline config #453

Merged
merged 6 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### Features

* add a preprocessor to enrich by systems env variables
* add option to define rules inline in pipeline config under processor configs `generic_rules` or `specific_rules`

### Improvements

Expand Down
37 changes: 27 additions & 10 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ class Config(Component.Config):
specific_rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
)
"""List of rule locations to load rules from.
In addition to paths to file directories it is possible to retrieve rules from a URI.
For valid URI formats see :ref:`getters`.
As last option it is possible to define entire rules with all their configuration parameters as list elements.
"""
generic_rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
)
"""List of rule locations to load rules from.
In addition to paths to file directories it is possible to retrieve rules from a URI.
For valid URI formats see :ref:`getters`.
As last option it is possible to define entire rules with all their configuration parameters as list elements.
"""
tree_config: Optional[str] = field(
default=None, validator=[validators.optional(validators.instance_of(str))]
Expand Down Expand Up @@ -229,20 +231,35 @@ def test_rules(self) -> dict:
"""

@staticmethod
def resolve_directories(rule_paths: list) -> list:
resolved_paths = []
for rule_path in rule_paths:
getter_instance = getter.GetterFactory.from_string(rule_path)
def resolve_directories(rule_sources: list) -> list:
"""resolves directories to a list of files or rule definitions

Parameters
----------
rule_sources : list
a list of files, directories or rule definitions

Returns
-------
list
a list of files and rule definitions
"""
resolved_sources = []
for rule_source in rule_sources:
if isinstance(rule_source, dict):
resolved_sources.append(rule_source)
continue
getter_instance = getter.GetterFactory.from_string(rule_source)
if getter_instance.protocol == "file":
if Path(getter_instance.target).is_dir():
paths = list_json_files_in_directory(getter_instance.target)
for file_path in paths:
resolved_paths.append(file_path)
resolved_sources.append(file_path)
else:
resolved_paths.append(rule_path)
resolved_sources.append(rule_source)
else:
resolved_paths.append(rule_path)
return resolved_paths
resolved_sources.append(rule_source)
return resolved_sources

def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: List[str]):
"""method to add rules from directories or urls"""
Expand Down
10 changes: 6 additions & 4 deletions logprep/processor/base/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,23 @@ def lucene_filter(self):
# pylint: enable=C0111

@classmethod
def create_rules_from_target(cls, path: str) -> list:
def create_rules_from_target(cls, rule_target: str) -> list:
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
"""Create a rule from a file."""
content = GetterFactory.from_string(path).get()
if isinstance(rule_target, dict):
return [cls._create_from_dict(rule_target)]
content = GetterFactory.from_string(rule_target).get()
try:
rule_data = json.loads(content)
except ValueError:
rule_data = yaml.load_all(content)
try:
rules = [cls._create_from_dict(rule) for rule in rule_data]
except InvalidRuleDefinitionError as error:
raise InvalidRuleDefinitionError(f"{path}: {error}") from error
raise InvalidRuleDefinitionError(f"{rule_target}: {error}") from error
if len(rules) == 0:
raise InvalidRuleDefinitionError("no rules in file")
for rule in rules:
rule.file_name = splitext(basename(path))[0]
rule.file_name = splitext(basename(rule_target))[0]
return rules

@classmethod
Expand Down
42 changes: 24 additions & 18 deletions logprep/util/schema_and_rule_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,28 @@

"""Runner for testing schemas and rules"""

from typing import Optional, List
from collections.abc import Iterable

from argparse import ArgumentParser
from collections.abc import Iterable
from json.decoder import JSONDecodeError
from logging import Logger
from os import walk
from os.path import join
from json.decoder import JSONDecodeError
from typing import List, Optional

from colorama import Fore

from logprep.util.configuration import Configuration

from logprep.abc.processor import Processor
from logprep.filter.lucene_filter import LuceneFilterError
from logprep.processor.base.exceptions import (
InvalidRuleDefinitionError,
MismatchedRuleDefinitionError,
)
from logprep.processor.base.rule import Rule
from logprep.abc.processor import Processor
from logprep.processor.labeler.labeling_schema import LabelingSchema, InvalidLabelingSchemaFileError
from logprep.filter.lucene_filter import LuceneFilterError
from logprep.processor.labeler.labeling_schema import (
InvalidLabelingSchemaFileError,
LabelingSchema,
)
from logprep.util.configuration import Configuration


class SchemaAndRuleChecker:
Expand Down Expand Up @@ -138,14 +139,17 @@ def _validate_rules_in_path(
path_schema: str = None,
):
number_of_checked_rules = 0
for root, _, files in walk(path_rules):
for file in files:
number_of_checked_rules += 1
rule_path = join(root, file)

multi_rule = self.check_rule_creation_errors(rule_class, rule_path)
self._validate_schema(multi_rule, path_schema, rule_path)
self._print_schema_check_results(path_schema)
if isinstance(path_rules, dict):
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
self.check_rule_creation_errors(rule_class, path_rules)
else:
for root, _, files in walk(path_rules):
for file in files:
number_of_checked_rules += 1
rule_path = join(root, file)

multi_rule = self.check_rule_creation_errors(rule_class, rule_path)
self._validate_schema(multi_rule, path_schema, rule_path)
self._print_schema_check_results(path_schema)
if not self.errors:
self._print_valid(
f"Valid {processor_type} rules in {path_rules} "
Expand Down Expand Up @@ -198,7 +202,9 @@ def check_rule_creation_errors(self, rule_class: Rule, rule_path: str) -> Option
"""
rule = None
try:
if rule_path.endswith(".json") or rule_path.endswith(".yml"):
if isinstance(rule_path, dict):
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
rule = rule_class.create_rules_from_target(rule_path)
elif rule_path.endswith(".json") or rule_path.endswith(".yml"):
if not rule_path.endswith("_test.json"):
rule = rule_class.create_rules_from_target(rule_path)
except InvalidRuleDefinitionError as error:
Expand Down
14 changes: 14 additions & 0 deletions quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pipeline:
- quickstart/exampledata/rules/dropper/specific
generic_rules:
- quickstart/exampledata/rules/dropper/specific
- filter: "test_dropper"
dropper:
drop:
- drop_me
description: "..."

- pre_detector:
type: pre_detector
Expand Down Expand Up @@ -78,6 +83,15 @@ pipeline:
max_cached_pseudonyms: 1000000
max_caching_days: 1

- calculator:
type: calculator
specific_rules:
- filter: "test_label: execute"
calculator:
target_field: "calculation"
calc: "1 + 1"
generic_rules: []

input:
kafka:
type: confluentkafka_input
Expand Down
12 changes: 2 additions & 10 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

from logprep.abc.input import (
CriticalInputError,
CriticalInputParsingError,
FatalInputError,
SourceDisconnectedError,
WarningInputError,
CriticalInputParsingError,
)
from logprep.abc.output import (
CriticalOutputError,
Expand All @@ -37,10 +37,7 @@
SharedCounter,
)
from logprep.metrics.metric import MetricTargets
from logprep.processor.base.exceptions import (
ProcessingCriticalError,
ProcessingWarning,
)
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.processor.deleter.rule import DeleterRule
from logprep.util.getter import GetterFactory
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
Expand All @@ -65,11 +62,6 @@ class ConfigurationForTests:
counter = SharedCounter()


class ProcessorWarningMockError(ProcessingWarning):
def __init__(self):
super().__init__("ProcessorWarningMockError")


@mock.patch("logprep.factory.Factory.create")
class TestPipeline(ConfigurationForTests):
def setup_method(self):
Expand Down
88 changes: 85 additions & 3 deletions tests/unit/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from string import ascii_letters
from unittest import mock

from pytest import raises, mark
from pytest import mark, raises

from logprep.abc.input import Input
from logprep.factory import Factory
from logprep.factory_error import (
InvalidConfigSpecificationError,
InvalidConfigurationError,
UnknownComponentTypeError,
NoTypeSpecifiedError,
InvalidConfigSpecificationError,
UnknownComponentTypeError,
)
from logprep.filter.expression.filter_expression import Exists
from logprep.processor.clusterer.processor import Clusterer
from logprep.processor.labeler.processor import Labeler
from logprep.processor.normalizer.processor import Normalizer
Expand Down Expand Up @@ -152,6 +153,87 @@ def test_create_labeler_creates_labeler_processor():
assert isinstance(processor, Labeler)


def test_creates_calculator_with_inline_rules():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
],
}
},
logger,
)
assert len(processor._generic_rules) == 1
assert len(processor._specific_rules) == 1


def test_creates_calculator_with_inline_rules_and_files():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message1",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
"tests/testdata/unit/calculator/generic_rules/calculator.json",
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
"tests/testdata/unit/calculator/specific_rules/calculator.json",
],
}
},
logger,
)
assert len(processor._generic_rules) == 2
assert len(processor._specific_rules) == 2
assert processor._generic_rules[0].filter_str == "message1: *"
assert processor._generic_rules[1].filter_str == "(field1: * AND field2: *)"


def test_creates_calculator_with_inline_rules_and_file_and_directory():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
"tests/testdata/unit/calculator/generic_rules/",
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
"tests/testdata/unit/calculator/specific_rules/calculator.json",
],
}
},
logger,
)
assert len(processor._generic_rules) == 2
assert len(processor._specific_rules) == 2


def test_dummy_input_creates_dummy_input_connector():
processor = Factory.create(
{"labelername": {"type": "dummy_input", "documents": [{}, {}]}},
Expand Down