Skip to content

Commit

Permalink
add calling setup during configuration verify (#601)
Browse files Browse the repository at this point in the history
* add calling setup during configuration verify
and handle FileNotFoundError with with filename
* ensure population cached properties during setup
* update changelog
  • Loading branch information
ekneg54 authored Jun 10, 2024
1 parent b6027a9 commit 9418851
Show file tree
Hide file tree
Showing 19 changed files with 84 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
* add `LogprepMPQueueListener` to outsource logging to a separate process
* add a single `Queuehandler` to root logger to ensure all logs were handled by `LogprepMPQueueListener`
* refactor `http_generator` to use a logprep http output connector
* ensure all `cached_properties` are populated during setup time

### Bugfix

* make `--username` and `--password` parameters optional in http generator
* fixes a bug where `FileNotFoundError` is raised during processing

## 11.3.0

Expand Down
12 changes: 10 additions & 2 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" abstract module for components"""

import functools
import inspect
from abc import ABC
from functools import cached_property
from typing import Callable
Expand Down Expand Up @@ -78,8 +80,14 @@ def describe(self) -> str:

def setup(self):
"""Set the component up."""
# initialize metrics
_ = self.metrics
self._populate_cached_properties()

def _populate_cached_properties(self):
_ = [
getattr(self, name)
for name, value in inspect.getmembers(self)
if isinstance(value, functools.cached_property)
]

def shut_down(self):
"""Stop processing of this component.
Expand Down
3 changes: 1 addition & 2 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,8 @@ def _lost_callback(self, consumer, topic_partitions):
self._consumer.assign(topic_partitions)

def setup(self) -> None:
super().setup()
try:
_ = self._consumer
super().setup()
except (KafkaException, ValueError) as error:
raise FatalInputError(self, str(error)) from error

Expand Down
3 changes: 1 addition & 2 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,8 @@ def store_failed(
self._producer.flush(timeout=self._config.flush_timeout)

def setup(self):
super().setup()
try:
_ = self._producer
super().setup()
except (KafkaException, ValueError) as error:
raise FatalOutputError(self, str(error)) from error

Expand Down
4 changes: 0 additions & 4 deletions logprep/connector/dummy/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,9 @@ def __init__(self, name: str, configuration: "Connector.Config"):
super().__init__(name, configuration)
self.events = []
self.failed_events = []
self.setup_called_count = 0
self.shut_down_called_count = 0
self._exceptions = configuration.exceptions

def setup(self):
self.setup_called_count += 1

def store(self, document: dict):
"""Store the document in the output destination.
Expand Down
1 change: 1 addition & 0 deletions logprep/connector/file/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def setup(self):
Right now this input connector is only started in the first process.
It needs the class attribute pipeline_index before running setup in Pipeline
Initiation"""
super().setup()
if not hasattr(self, "pipeline_index"):
raise FatalInputError(
self, "Necessary instance attribute `pipeline_index` could not be found."
Expand Down
5 changes: 1 addition & 4 deletions logprep/processor/list_comparison/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@ class Config(Processor.Config):

rule_class = ListComparisonRule

def __init__(self, name: str, configuration: "Processor.Config"):
super().__init__(name, configuration)
self.setup()

def setup(self):
super().setup()
for rule in [*self._specific_rules, *self._generic_rules]:
rule.init_list_comparison(self._config.list_search_base_path)

Expand Down
3 changes: 3 additions & 0 deletions logprep/util/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,9 +816,12 @@ def _verify(self):
for processor_config in self.pipeline:
try:
processor = Factory.create(deepcopy(processor_config))
processor.setup()
self._verify_rules(processor)
except (FactoryError, TypeError, ValueError, InvalidRuleDefinitionError) as error:
errors.append(error)
except FileNotFoundError as error:
errors.append(InvalidConfigurationError(f"File not found: {error.filename}"))
try:
self._verify_processor_outputs(processor_config)
except Exception as error: # pylint: disable=broad-except
Expand Down
9 changes: 6 additions & 3 deletions tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def teardown_function():


def test_start_of_logprep_with_full_configuration_from_file(tmp_path):
pipeline = get_full_pipeline(exclude=["normalizer"])
pipeline = get_full_pipeline(exclude=["normalizer", "geoip_enricher"])
config = get_default_logprep_config(pipeline, with_hmac=False)
config.output.update({"kafka": {"type": "dummy_output", "default": False}})
config_path = tmp_path / "generated_config.yml"
Expand All @@ -42,7 +42,7 @@ def test_start_of_logprep_with_full_configuration_from_file(tmp_path):


def test_start_of_logprep_with_full_configuration_http():
pipeline = get_full_pipeline(exclude=["normalizer"])
pipeline = get_full_pipeline(exclude=["normalizer", "geoip_enricher"])
config = get_default_logprep_config(pipeline, with_hmac=False)
config.output.update({"kafka": {"type": "dummy_output", "default": False}})
endpoint = "http://localhost:32000"
Expand Down Expand Up @@ -122,7 +122,10 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
# requester is excluded because it tries to connect to non-existing server
# selective_extractor is excluded because of output mismatch (rules expect kafka as output)
# normalizer is excluded because of deprecation
pipeline = get_full_pipeline(exclude=["requester", "selective_extractor", "normalizer"])
# geoip_enricher is excluded because of missing maxmind license
pipeline = get_full_pipeline(
exclude=["requester", "selective_extractor", "normalizer", "geoip_enricher"]
)
config = get_default_logprep_config(pipeline, with_hmac=False)
config.version = "my_custom_version"
config.config_refresh_interval = 300
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/component/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,8 @@ def test_all_metric_attributes_are_tested(self):
difference = fullnames.difference(set(self.expected_metrics))
assert not difference, f"{difference} are not defined in `expected_metrics`"
assert fullnames == set(self.expected_metrics)

@mock.patch("inspect.getmembers", return_value=[("mock_prop", lambda: None)])
def test_setup_populates_cached_properties(self, mock_getmembers):
self.object.setup()
mock_getmembers.assert_called_with(self.object)
1 change: 0 additions & 1 deletion tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint: disable=wrong-import-position
# pylint: disable=wrong-import-order
# pylint: disable=attribute-defined-outside-init
import logging
import socket
from copy import deepcopy
from unittest import mock
Expand Down
7 changes: 1 addition & 6 deletions tests/unit/connector/test_dummy_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# pylint: disable=no-self-use
from copy import deepcopy

from pytest import raises, fail
from pytest import fail, raises

from logprep.abc.output import FatalOutputError
from logprep.factory import Factory
Expand All @@ -28,11 +28,6 @@ def test_store_custom_appends_document_to_variable(self):
assert len(self.object.events) == 1
assert self.object.events[0] == document

def test_increments_setup_called_count_when_setup_was_called(self):
assert self.object.setup_called_count == 0
self.object.setup()
assert self.object.setup_called_count == 1

def test_increments_shutdown_called_count_when_shutdown_was_called(self):
assert self.object.shut_down_called_count == 0
self.object.shut_down()
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/connector/test_elasticsearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,12 @@ def test_message_backlog_is_cleared_after_it_was_written(self):
self.object._config.message_backlog_size = 1
self.object.store({"event": "test_event"})
assert len(self.object._message_backlog) == 0

@mock.patch(
"logprep.connector.elasticsearch.output.ElasticsearchOutput._search_context",
new=mock.MagicMock(),
)
@mock.patch("inspect.getmembers", return_value=[("mock_prop", lambda: None)])
def test_setup_populates_cached_properties(self, mock_getmembers):
self.object.setup()
mock_getmembers.assert_called_with(self.object)
15 changes: 8 additions & 7 deletions tests/unit/connector/test_jsonl_output.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=missing-docstring
# pylint: disable=attribute-defined-outside-init
# pylint: disable=protected-access
import tempfile
from unittest import mock

from logprep.connector.jsonl.output import JsonlOutput
Expand All @@ -10,9 +11,9 @@
class TestJsonlOutputOutput(BaseOutputTestCase):
CONFIG = {
"type": "jsonl_output",
"output_file": "does/not/matter",
"output_file_custom": "custom_file",
"output_file_error": "error_file",
"output_file": f"{tempfile.gettempdir()}/output.jsonl",
"output_file_custom": f"{tempfile.gettempdir()}/custom_file",
"output_file_error": f"{tempfile.gettempdir()}/error_file",
}

def setup_method(self) -> None:
Expand Down Expand Up @@ -53,7 +54,7 @@ def test_stores_failed_events_in_respective_list(self, _):
@mock.patch("logprep.connector.jsonl.output.JsonlOutput._write_json")
def test_write_document_to_file_on_store(self, _):
self.object.store(self.document)
self.object._write_json.assert_called_with("does/not/matter", self.document)
self.object._write_json.assert_called_with("/tmp/output.jsonl", self.document)

@mock.patch("logprep.connector.jsonl.output.JsonlOutput._write_json")
def test_write_document_to_file_on_store_custom(self, _):
Expand All @@ -68,15 +69,15 @@ def test_write_multiple_documents_to_file_on_store(self, _):
self.object.store(self.document)
assert self.object._write_json.call_count == 2
assert self.object._write_json.call_args_list == [
mock.call("does/not/matter", {"message": "test message"}),
mock.call("does/not/matter", {"message": "test message"}),
mock.call("/tmp/output.jsonl", {"message": "test message"}),
mock.call("/tmp/output.jsonl", {"message": "test message"}),
]

@mock.patch("logprep.connector.jsonl.output.JsonlOutput._write_json")
def test_store_failed_writes_errors(self, _):
self.object.store_failed("my error message", self.document, self.document)
self.object._write_json.assert_called_with(
"error_file",
f"{tempfile.gettempdir()}/error_file",
{
"error_message": "my error message",
"document_received": {"message": "test message"},
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,12 @@ def test_opensearch_parallel_bulk(self):
index="defaultindex", body={"query": {"match": {"foo": uuid_str}}}
)
assert len(result["hits"]["hits"]) > len_before

@mock.patch(
"logprep.connector.opensearch.output.OpensearchOutput._search_context",
new=mock.MagicMock(),
)
@mock.patch("inspect.getmembers", return_value=[("mock_prop", lambda: None)])
def test_setup_populates_cached_properties(self, mock_getmembers):
self.object.setup()
mock_getmembers.assert_called_with(self.object)
7 changes: 6 additions & 1 deletion tests/unit/connector/test_s3_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# pylint: disable=wrong-import-order
# pylint: disable=attribute-defined-outside-init
import logging
from collections import defaultdict
from copy import deepcopy
from datetime import datetime
from math import isclose
Expand Down Expand Up @@ -318,3 +317,9 @@ def test_setup_raises_fataloutputerror_if_boto_exception_is_raised(self, error,
@staticmethod
def _calculate_backlog_size(s3_output):
return sum(len(values) for values in s3_output._message_backlog.values())

@mock.patch("logprep.connector.s3.output.S3Output._s3_resource", new=mock.MagicMock())
@mock.patch("inspect.getmembers", return_value=[("mock_prop", lambda: None)])
def test_setup_populates_cached_properties(self, mock_getmembers):
self.object.setup()
mock_getmembers.assert_called_with(self.object)
4 changes: 4 additions & 0 deletions tests/unit/processor/list_comparison/test_list_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class TestListComparison(BaseProcessorTestCase):
"list_search_base_path": "tests/testdata/unit/list_comparison/rules",
}

def setup_method(self):
super().setup_method()
self.object.setup()

def test_element_in_list(self):
document = {"user": "Franz"}
expected = {"user": "Franz", "user_results": {"in_list": ["user_list.txt"]}}
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/util/test_auto_rule_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ def test_list_comparison_specific_setup_called_on_load_rules(
auto_rule_tester._reset_trees(
processor
) # Called every time by auto tester before adding rules instead
mock_setup.assert_called_once()
auto_rule_tester._load_rules(processor, "specific_rules")
assert mock_setup.call_count == 2
mock_setup.assert_called_once()

def test_full_auto_rule_test_run(self, auto_rule_tester, capsys):
with pytest.raises(SystemExit):
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/util/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,22 @@ def test_verify_credentials_file_raises_for_unexpected_key(self, config_path, tm
):
_ = Configuration.from_sources([str(config_path)])

def test_verify_calls_processor_setup(self, config_path):
config = Configuration.from_sources([str(config_path)])
with mock.patch("logprep.abc.processor.Processor.setup") as mocked_setup:
config._verify()
mocked_setup.assert_called()

def test_verify_prints_file_not_found_errors_with_filename(self, config_path):
config = Configuration.from_sources([str(config_path)])
with mock.patch(
"logprep.abc.processor.Processor.setup", side_effect=lambda: open("not_existing_file")
):
with pytest.raises(
InvalidConfigurationError, match="File not found: not_existing_file"
):
config._verify()


class TestInvalidConfigurationErrors:
@pytest.mark.parametrize(
Expand Down

0 comments on commit 9418851

Please sign in to comment.