Skip to content

Commit

Permalink
add inline_rule_definition
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 7, 2023
1 parent 37e2a4d commit ccb3075
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 19 deletions.
19 changes: 17 additions & 2 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Config(Component.Config):
specific_rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
)
"""List of rule locations to load rules from.
Expand All @@ -51,7 +51,7 @@ class Config(Component.Config):
generic_rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
)
"""List of rule locations to load rules from.
Expand Down Expand Up @@ -230,8 +230,23 @@ def test_rules(self) -> dict:

@staticmethod
def resolve_directories(rule_paths: list) -> list:
"""resolves directories to a list of files or rule definitions
Parameters
----------
rule_paths : list
a list of files, directories or rule definitions
Returns
-------
list
a list of files and rule definitions
"""
resolved_paths = []
for rule_path in rule_paths:
if isinstance(rule_path, dict):
resolved_paths.append(rule_path)
continue
getter_instance = getter.GetterFactory.from_string(rule_path)
if getter_instance.protocol == "file":
if Path(getter_instance.target).is_dir():
Expand Down
10 changes: 6 additions & 4 deletions logprep/processor/base/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,23 @@ def lucene_filter(self):
# pylint: enable=C0111

@classmethod
def create_rules_from_target(cls, path: str) -> list:
def create_rules_from_target(cls, rule_target: str) -> list:
"""Create a rule from a file."""
content = GetterFactory.from_string(path).get()
if isinstance(rule_target, dict):
return [cls._create_from_dict(rule_target)]
content = GetterFactory.from_string(rule_target).get()
try:
rule_data = json.loads(content)
except ValueError:
rule_data = yaml.load_all(content)
try:
rules = [cls._create_from_dict(rule) for rule in rule_data]
except InvalidRuleDefinitionError as error:
raise InvalidRuleDefinitionError(f"{path}: {error}") from error
raise InvalidRuleDefinitionError(f"{rule_target}: {error}") from error
if len(rules) == 0:
raise InvalidRuleDefinitionError("no rules in file")
for rule in rules:
rule.file_name = splitext(basename(path))[0]
rule.file_name = splitext(basename(rule_target))[0]
return rules

@classmethod
Expand Down
14 changes: 14 additions & 0 deletions quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pipeline:
- quickstart/exampledata/rules/dropper/specific
generic_rules:
- quickstart/exampledata/rules/dropper/specific
- filter: "test_dropper"
dropper:
drop:
- drop_me
description: "..."

- pre_detector:
type: pre_detector
Expand Down Expand Up @@ -78,6 +83,15 @@ pipeline:
max_cached_pseudonyms: 1000000
max_caching_days: 1

- calculator:
type: calculator
specific_rules:
- filter: "test_label: execute"
calculator:
target_field: "calculation"
calc: "1 + 1"
generic_rules: []

input:
kafka:
type: confluentkafka_input
Expand Down
54 changes: 54 additions & 0 deletions tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,57 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
r"logprep_tracking_interval_in_seconds.*config_version.*logprep_version.* 1\.0", metrics
)
proc.kill()


def test_pipeline_with_inline_rule_configs(tmp_path):
pipeline_definition = """---
version: 1
process_count: 1
timeout: 0.1
logger:
level: DEBUG
pipeline:
- calculator:
type: calculator
generic_rules:
- filter: "message"
calculator:
target_field: target
calc: "1 + 1"
specific_rules:
- filter: "message1"
calculator:
target_field: target
calc: "1 + 3"
- filter: "message1"
calculator:
target_field: target1
calc: "6 + 3"
description: "test description"
input:
kafka:
type: confluentkafka_input
bootstrapservers:
- 127.0.0.1:9092
topic: consumer
group: cgroup3
auto_commit: true
session_timeout: 6000
offset_reset_policy: smallest
output:
opensearch:
type: opensearch_output
hosts:
- 127.0.0.1:9200
default_index: processed
error_index: errors
message_backlog_size: 10000
timeout: 10000
flush_timeout: 10
max_retries: 3
user: admin
secret: admin
"""
12 changes: 2 additions & 10 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

from logprep.abc.input import (
CriticalInputError,
CriticalInputParsingError,
FatalInputError,
SourceDisconnectedError,
WarningInputError,
CriticalInputParsingError,
)
from logprep.abc.output import (
CriticalOutputError,
Expand All @@ -37,10 +37,7 @@
SharedCounter,
)
from logprep.metrics.metric import MetricTargets
from logprep.processor.base.exceptions import (
ProcessingCriticalError,
ProcessingWarning,
)
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.processor.deleter.rule import DeleterRule
from logprep.util.getter import GetterFactory
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
Expand All @@ -65,11 +62,6 @@ class ConfigurationForTests:
counter = SharedCounter()


class ProcessorWarningMockError(ProcessingWarning):
def __init__(self):
super().__init__("ProcessorWarningMockError")


@mock.patch("logprep.factory.Factory.create")
class TestPipeline(ConfigurationForTests):
def setup_method(self):
Expand Down
88 changes: 85 additions & 3 deletions tests/unit/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from string import ascii_letters
from unittest import mock

from pytest import raises, mark
from pytest import mark, raises

from logprep.abc.input import Input
from logprep.factory import Factory
from logprep.factory_error import (
InvalidConfigSpecificationError,
InvalidConfigurationError,
UnknownComponentTypeError,
NoTypeSpecifiedError,
InvalidConfigSpecificationError,
UnknownComponentTypeError,
)
from logprep.filter.expression.filter_expression import Exists
from logprep.processor.clusterer.processor import Clusterer
from logprep.processor.labeler.processor import Labeler
from logprep.processor.normalizer.processor import Normalizer
Expand Down Expand Up @@ -152,6 +153,87 @@ def test_create_labeler_creates_labeler_processor():
assert isinstance(processor, Labeler)


def test_creates_calculator_with_inline_rules():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
],
}
},
logger,
)
assert len(processor._generic_rules) == 1
assert len(processor._specific_rules) == 1


def test_creates_calculator_with_inline_rules_and_files():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message1",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
"tests/testdata/unit/calculator/generic_rules/calculator.json",
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
"tests/testdata/unit/calculator/specific_rules/calculator.json",
],
}
},
logger,
)
assert len(processor._generic_rules) == 2
assert len(processor._specific_rules) == 2
assert processor._generic_rules[0].filter_str == "message1: *"
assert processor._generic_rules[1].filter_str == "(field1: * AND field2: *)"


def test_creates_calculator_with_inline_rules_and_file_and_directory():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
"tests/testdata/unit/calculator/generic_rules/",
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
"tests/testdata/unit/calculator/specific_rules/calculator.json",
],
}
},
logger,
)
assert len(processor._generic_rules) == 2
assert len(processor._specific_rules) == 2


def test_dummy_input_creates_dummy_input_connector():
processor = Factory.create(
{"labelername": {"type": "dummy_input", "documents": [{}, {}]}},
Expand Down

0 comments on commit ccb3075

Please sign in to comment.