Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor validation of generic_resolver rules to startup #694

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
### Breaking
### Features
### Improvements

* replace `BaseException` with `Exception` for custom errors
* refactor `generic_resolver` to validate rules on startup instead of application of each rule

### Bugfix

- fix `confluent_kafka.store_offsets` if `last_valid_record` is `None`, can happen if a rebalancing happens
Expand Down
58 changes: 3 additions & 55 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,20 @@
import re
from typing import Union

from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
)
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.rule import GenericResolverRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value


class GenericResolverError(ProcessingCriticalError):
"""Base class for GenericResolver related exceptions."""

def __init__(self, name: str, message: str, rule: GenericResolverRule):
super().__init__(f"{name}: {message}", rule=rule)


class GenericResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""

__slots__ = ["_replacements_from_file"]

_replacements_from_file: dict

rule_class = GenericResolverRule

def __init__(self, name: str, configuration: FieldManager.Config):
super().__init__(name=name, configuration=configuration)
self._replacements_from_file = {}

def _apply_rules(self, event, rule):
"""Apply the given rule to the current event"""
conflicting_fields = []
self.ensure_rules_from_file(rule)

source_values = []
for source_field, target_field in rule.field_mapping.items():
Expand All @@ -73,17 +53,10 @@ def _apply_rules(self, event, rule):
# FILE
if rule.resolve_from_file:
pattern = f'^{rule.resolve_from_file["pattern"]}$'
replacements = self._replacements_from_file[rule.resolve_from_file["path"]]
replacements = rule.resolve_from_file["additions"]
matches = re.match(pattern, source_value)
if matches:
mapping = matches.group("mapping") if "mapping" in matches.groupdict() else None
if mapping is None:
raise GenericResolverError(
self.name,
"Mapping group is missing in mapping file pattern!",
rule=rule,
)
dest_val = replacements.get(mapping)
dest_val = replacements.get(matches.group("mapping"))
if dest_val:
success = self._add_uniquely_to_list(event, rule, target_field, dest_val)
if not success:
Expand Down Expand Up @@ -132,28 +105,3 @@ def _add_uniquely_to_list(
return add_success
add_success = add_field_to(event, target, content, extends_lists=rule.extend_target_list)
return add_success

def ensure_rules_from_file(self, rule):
"""loads rules from file"""
if rule.resolve_from_file:
if rule.resolve_from_file["path"] not in self._replacements_from_file:
try:
add_dict = GetterFactory.from_string(rule.resolve_from_file["path"]).get_yaml()
if isinstance(add_dict, dict) and all(
isinstance(value, str) for value in add_dict.values()
):
self._replacements_from_file[rule.resolve_from_file["path"]] = add_dict
else:
raise GenericResolverError(
self.name,
f"Additions file "
f'\'{rule.resolve_from_file["path"]}\''
f" must be a dictionary with string values!",
rule=rule,
)
except FileNotFoundError as error:
raise GenericResolverError(
self.name,
f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!",
rule=rule,
) from error
24 changes: 24 additions & 0 deletions logprep/processor/generic_resolver/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@
:noindex:
"""

from pathlib import Path

from attrs import define, field, validators

from logprep.factory_error import InvalidConfigurationError
from logprep.processor.field_manager.rule import FieldManagerRule
from logprep.util.getter import GetterFactory


class GenericResolverRule(FieldManagerRule):
Expand Down Expand Up @@ -122,6 +126,26 @@ class Config(FieldManagerRule.Config):
The resolve list in the file at :code:`path` is then used in conjunction with
the regex pattern in :code:`pattern`."""

def __attrs_post_init__(self):
if self.resolve_from_file:
file_path = self.resolve_from_file["path"]
if "?P<mapping>" not in self.resolve_from_file["pattern"]:
raise InvalidConfigurationError(
f"Mapping group is missing in mapping file pattern! (Rule ID: '{self.id}')"
)
if not Path(file_path).is_file():
raise InvalidConfigurationError(
f"Additions file '{file_path}' not found! (Rule ID: '{self.id}')",
)
add_dict = GetterFactory.from_string(file_path).get_yaml()
if not isinstance(add_dict, dict) or not all(
isinstance(value, str) for value in add_dict.values()
):
raise InvalidConfigurationError(
f"Additions file '{file_path}' must be a dictionary with string values! (Rule ID: '{self.id}')",
)
self.resolve_from_file["additions"] = add_dict

@property
def field_mapping(self) -> dict:
"""Returns the field mapping"""
Expand Down
19 changes: 10 additions & 9 deletions logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@

from attr import define, field

from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError
from logprep.processor.base.exceptions import (
FieldExistsWarning,
SkipImportError,
ProcessingCriticalError,
)
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.processor import GenericResolverError
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.validators import directory_validator

Expand All @@ -57,10 +60,6 @@
# pylint: enable=ungrouped-imports


class HyperscanResolverError(GenericResolverError):
"""Base class for HyperscanResolver related exceptions."""


class HyperscanResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""

Expand Down Expand Up @@ -169,7 +168,7 @@ def _get_hyperscan_database(self, rule: HyperscanResolverRule):
try:
database, value_mapping = self._load_database(database_id, resolve_list)
except FileNotFoundError:
database, value_mapping = self._create_database(resolve_list)
database, value_mapping = self._create_database(resolve_list, rule)

if rule.store_db_persistent:
self._save_database(database, database_id)
Expand Down Expand Up @@ -201,7 +200,7 @@ def _save_database(self, database: Database, database_id: int):
with open(f"{self._hyperscan_database_path}/{database_id}.db", "wb") as db_file:
db_file.write(serialized_db)

def _create_database(self, resolve_list: dict):
def _create_database(self, resolve_list: dict, rule):
database = Database()
value_mapping = {}
db_patterns = []
Expand All @@ -211,7 +210,9 @@ def _create_database(self, resolve_list: dict):
value_mapping[idx] = resolve_list[pattern]

if not db_patterns:
raise HyperscanResolverError(self.name, "No patter to compile for hyperscan database!")
raise ProcessingCriticalError(
f"{self.name} No patter to compile for hyperscan database!", rule
)

expressions, ids, flags = zip(*db_patterns)
database.compile(expressions=expressions, ids=ids, elements=len(db_patterns), flags=flags)
Expand Down
39 changes: 1 addition & 38 deletions tests/unit/processor/generic_resolver/test_generic_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
# pylint: disable=wrong-import-position
from collections import OrderedDict

from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
)
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.generic_resolver.processor import GenericResolver
from tests.unit.processor.base import BaseProcessorTestCase

Expand Down Expand Up @@ -277,40 +274,6 @@ def test_resolve_dotted_field_no_conflict_match_from_file_and_list_has_conflict_

assert document == expected

def test_resolve_dotted_field_no_conflict_match_from_file_group_mapping_does_not_exist(
self,
):
rule = {
"filter": "to_resolve",
"generic_resolver": {
"field_mapping": {"to_resolve": "resolved"},
"resolve_from_file": {
"path": "tests/testdata/unit/generic_resolver/resolve_mapping.yml",
"pattern": r"\d*(?P<foobar>[a-z]+)\d*",
},
"resolve_list": {"FOO": "BAR"},
},
}
self._load_specific_rule(rule)
document = {"to_resolve": "ab"}
result = self.object.process(document)
assert isinstance(result.errors[0], ProcessingCriticalError)
assert "Mapping group is missing in mapping" in result.errors[0].args[0]

def test_resolve_generic_match_from_file_and_file_does_not_exist(self):
rule = {
"filter": "to.resolve",
"generic_resolver": {
"field_mapping": {"to.resolve": "resolved"},
"resolve_from_file": {"path": "foo", "pattern": "bar"},
},
}
self._load_specific_rule(rule)
document = {"to": {"resolve": "something HELLO1"}}
result = self.object.process(document)
assert isinstance(result.errors[0], ProcessingCriticalError)
assert "Additions file 'foo' not found" in result.errors[0].args[0]

def test_resolve_dotted_field_no_conflict_no_match(self):
rule = {
"filter": "to.resolve",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# pylint: disable=wrong-import-order
import pytest

from logprep.factory_error import InvalidConfigurationError
from logprep.processor.generic_resolver.rule import GenericResolverRule


Expand Down Expand Up @@ -158,3 +159,48 @@ def test_rules_equality(
rule1 = GenericResolverRule._create_from_dict(specific_rule_definition)
rule2 = GenericResolverRule._create_from_dict(other_rule_definition)
assert (rule1 == rule2) == is_equal, testcase

@pytest.mark.parametrize(
["rule", "error", "message"],
[
(
{
"filter": "to_resolve",
"generic_resolver": {
"field_mapping": {"to_resolve": "resolved"},
"resolve_from_file": {
"path": "tests/testdata/unit/generic_resolver/resolve_mapping.yml",
"pattern": r"\d*(?P<foobar>[a-z]+)\d*",
},
"resolve_list": {"FOO": "BAR"},
},
},
InvalidConfigurationError,
"Mapping group is missing in mapping",
),
(
{
"filter": "to.resolve",
"generic_resolver": {
"field_mapping": {"to.resolve": "resolved"},
"resolve_from_file": {
"path": "foo",
"pattern": r"\d*(?P<mapping>[a-z]+)\d*",
},
},
},
InvalidConfigurationError,
"Additions file 'foo' not found",
),
],
)
def test_create_from_dict_validates_config(self, rule, error, message):
if error:
with pytest.raises(error, match=message):
GenericResolverRule._create_from_dict(rule)
else:
rule_instance = GenericResolverRule._create_from_dict(rule)
assert hasattr(rule_instance, "_config")
for key, value in rule.get("generic_resolver").items():
assert hasattr(rule_instance._config, key)
assert value == getattr(rule_instance._config, key)
Loading