Skip to content

Commit

Permalink
Merge branch 'main' into 650-requester-documentation-examples-are-out…
Browse files Browse the repository at this point in the history
…-of-date
  • Loading branch information
ekneg54 authored Nov 14, 2024
2 parents 20c6edb + 50ee02e commit a1430e8
Show file tree
Hide file tree
Showing 77 changed files with 844 additions and 762 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ examples/k8s/charts
*.so
target
wheelhouse
requirements.*
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.analysis.importFormat": "absolute",
"python.testing.pytestPath": "pytest",
"editor.formatOnSave": true,
"editor.rulers": [
80,
100,
120
],
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
"source.organizeImports": "explicit",
"source.unusedImports": "always",
"source.convertImportFormat": "always"
},
"autoDocstring.docstringFormat": "numpy",
"files.exclude": {
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,26 @@

## next release
### Breaking

* `CriticalInputError` is raised when the input preprocessor values can't be set, this was so far only true
for the hmac preprocessor, but is now also applied for all other preprocessors.
* fix `delimiter` typo in `StringSplitterRule` configuration

### Features
### Improvements

* fix `requester` documentation
* replace `BaseException` with `Exception` for custom errors
* refactor `generic_resolver` to validate rules on startup instead of application of each rule
* rewrite the helper method `add_field_to` such that it always raises an `FieldExistsWarning` instead of return a bool.
* add new helper method `add_fields_to` to directly add multiple fields to one event
* refactored some processors to make use of the new helper methods

### Bugfix

* fix `confluent_kafka.store_offsets` if `last_valid_record` is `None`, can happen if a rebalancing happens
before the first message was pulled.
- fix pseudonymizer cache metrics not updated

## 14.0.0
### Breaking
Expand Down
71 changes: 37 additions & 34 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from logprep.abc.connector import Connector
from logprep.abc.exceptions import LogprepException
from logprep.metrics.metrics import Metric
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.util.helper import add_fields_to, get_dotted_field_value
from logprep.util.time import UTC, TimeParser
from logprep.util.validators import dict_structure_validator

Expand Down Expand Up @@ -280,16 +281,19 @@ def get_next(self, timeout: float) -> dict | None:
self.metrics.number_of_processed_events += 1
if not isinstance(event, dict):
raise CriticalInputError(self, "not a dict", event)
if self._add_hmac:
event = self._add_hmac_to(event, raw_event)
if self._add_version_info:
self._add_version_information_to_event(event)
if self._add_log_arrival_time_information:
self._add_arrival_time_information_to_event(event)
if self._add_log_arrival_timedelta_information:
self._add_arrival_timedelta_information_to_event(event)
if self._add_env_enrichment:
self._add_env_enrichment_to_event(event)
try:
if self._add_hmac:
event = self._add_hmac_to(event, raw_event)
if self._add_version_info:
self._add_version_information_to_event(event)
if self._add_log_arrival_time_information:
self._add_arrival_time_information_to_event(event)
if self._add_log_arrival_timedelta_information:
self._add_arrival_timedelta_information_to_event(event)
if self._add_env_enrichment:
self._add_env_enrichment_to_event(event)
except FieldExistsWarning as error:
raise CriticalInputError(self, error.args[0], event) from error
return event

def batch_finished_callback(self):
Expand All @@ -300,13 +304,19 @@ def _add_env_enrichment_to_event(self, event: dict):
enrichments = self._config.preprocessing.get("enrich_by_env_variables")
if not enrichments:
return
for target_field, variable_name in enrichments.items():
add_field_to(event, target_field, os.environ.get(variable_name, ""))
fields = {
target: os.environ.get(variable_name, "")
for target, variable_name in enrichments.items()
}
add_fields_to(event, fields)

def _add_arrival_time_information_to_event(self, event: dict):
now = TimeParser.now()
target_field = self._config.preprocessing.get("log_arrival_time_target_field")
add_field_to(event, target_field, now.isoformat())
new_field = {
self._config.preprocessing.get(
"log_arrival_time_target_field"
): TimeParser.now().isoformat()
}
add_fields_to(event, new_field)

def _add_arrival_timedelta_information_to_event(self, event: dict):
log_arrival_timedelta_config = self._config.preprocessing.get("log_arrival_timedelta")
Expand All @@ -322,16 +332,16 @@ def _add_arrival_timedelta_information_to_event(self, event: dict):
TimeParser.from_string(log_arrival_time).astimezone(UTC)
- TimeParser.from_string(time_reference).astimezone(UTC)
).total_seconds()
add_field_to(event, target_field, delta_time_sec)
add_fields_to(event, fields={target_field: delta_time_sec})

def _add_version_information_to_event(self, event: dict):
"""Add the version information to the event"""
target_field = self._config.preprocessing.get("version_info_target_field")
# pylint: disable=protected-access
add_field_to(event, target_field, self._config._version_information)
add_fields_to(event, fields={target_field: self._config._version_information})
# pylint: enable=protected-access

def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
def _add_hmac_to(self, event_dict, raw_event) -> dict:
"""
Calculates an HMAC (Hash-based message authentication code) based on a given target field
and adds it to the given event. If the target field has the value '<RAW_MSG>' the full raw
Expand All @@ -357,7 +367,7 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
------
CriticalInputError
If the hmac could not be added to the event because the desired output field already
exists or cant't be found.
exists or can't be found.
"""
hmac_options = self._config.preprocessing.get("hmac", {})
hmac_target_field_name = hmac_options.get("target")
Expand All @@ -381,18 +391,11 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
digestmod=hashlib.sha256,
).hexdigest()
compressed = zlib.compress(received_orig_message, level=-1)
hmac_output = {"hmac": hmac, "compressed_base64": base64.b64encode(compressed).decode()}
add_was_successful = add_field_to(
event_dict,
hmac_options.get("output_field"),
hmac_output,
)
if not add_was_successful:
raise CriticalInputError(
self,
f"Couldn't add the hmac to the input event as the desired "
f"output field '{hmac_options.get('output_field')}' already "
f"exist.",
event_dict,
)
new_field = {
hmac_options.get("output_field"): {
"hmac": hmac,
"compressed_base64": base64.b64encode(compressed).decode(),
}
}
add_fields_to(event_dict, new_field)
return event_dict
22 changes: 10 additions & 12 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
from logprep.framework.rule_tree.rule_tree import RuleTree, RuleTreeType
from logprep.metrics.metrics import Metric
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
ProcessingError,
ProcessingWarning,
)
from logprep.util import getter
from logprep.util.helper import (
add_and_overwrite,
add_field_to,
add_fields_to,
get_dotted_field_value,
pop_dotted_field_value,
)
Expand Down Expand Up @@ -357,13 +356,15 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
if failure_tags is None:
failure_tags = rule.failure_tags
if tags is None:
add_and_overwrite(event, "tags", sorted(list({*failure_tags})))
new_field = {"tags": sorted(list({*failure_tags}))}
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
new_field = {"tags": sorted(list({*tags, *failure_tags}))}
add_and_overwrite(event, new_field, rule)
if isinstance(error, ProcessingWarning):
if error.tags:
tags = tags if tags else []
add_and_overwrite(event, "tags", sorted(list({*error.tags, *tags, *failure_tags})))
new_field = {"tags": sorted(list({*error.tags, *tags, *failure_tags}))}
add_and_overwrite(event, new_field, rule)
self.result.warnings.append(error)
else:
self.result.warnings.append(ProcessingWarning(str(error), rule, event))
Expand All @@ -375,21 +376,18 @@ def _has_missing_values(self, event, rule, source_field_dict):
if missing_fields:
if rule.ignore_missing_fields:
return True
error = BaseException(f"{self.name}: no value for fields: {missing_fields}")
error = Exception(f"{self.name}: no value for fields: {missing_fields}")
self._handle_warning_error(event, rule, error)
return True
return False

def _write_target_field(self, event: dict, rule: "Rule", result: any) -> None:
add_successful = add_field_to(
add_fields_to(
event,
output_field=rule.target_field,
content=result,
fields={rule.target_field: result},
extends_lists=rule.extend_target_list,
overwrite_output_field=rule.overwrite_target,
overwrite_target_field=rule.overwrite_target,
)
if not add_successful:
raise FieldExistsWarning(rule, event, [rule.target_field])

def setup(self):
super().setup()
Expand Down
6 changes: 3 additions & 3 deletions logprep/connector/dummy/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
A dummy input that returns the documents it was initialized with.
If a "document" is derived from BaseException, that exception will be thrown instead of
If a "document" is derived from Exception, that exception will be thrown instead of
returning a document. The exception will be removed and subsequent calls may return documents or
throw other exceptions in the given order.
Expand Down Expand Up @@ -36,7 +36,7 @@ class DummyInput(Input):
class Config(Input.Config):
"""DummyInput specific configuration"""

documents: List[Union[dict, type, BaseException]]
documents: List[Union[dict, type, Exception]]
"""A list of documents that should be returned."""
repeat_documents: Optional[str] = field(
validator=validators.instance_of(bool), default=False
Expand All @@ -57,6 +57,6 @@ def _get_event(self, timeout: float) -> tuple:

document = self._documents.pop(0)

if (document.__class__ == type) and issubclass(document, BaseException):
if (document.__class__ == type) and issubclass(document, Exception):
raise document
return document, None
2 changes: 1 addition & 1 deletion logprep/connector/file/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
FileInput
==========
A generic line input that returns the documents it was initialized with.
If a "document" is derived from BaseException, that exception will be thrown instead of
If a "document" is derived from Exception, that exception will be thrown instead of
returning a document. The exception will be removed and subsequent calls may return documents or
throw other exceptions in the given order.
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/json/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
A json input that returns the documents it was initialized with.
If a "document" is derived from BaseException, that exception will be thrown instead of
If a "document" is derived from Exception, that exception will be thrown instead of
returning a document. The exception will be removed and subsequent calls may return documents or
throw other exceptions in the given order.
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/jsonl/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
A json line input that returns the documents it was initialized with.
If a "document" is derived from BaseException, that exception will be thrown instead of
If a "document" is derived from Exception, that exception will be thrown instead of
returning a document. The exception will be removed and subsequent calls may return documents or
throw other exceptions in the given order.
Expand Down
2 changes: 1 addition & 1 deletion logprep/filter/expression/filter_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, List


class FilterExpressionError(BaseException):
class FilterExpressionError(Exception):
"""Base class for FilterExpression related exceptions."""


Expand Down
2 changes: 1 addition & 1 deletion logprep/filter/lucene_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
logger = logging.getLogger("LuceneFilter")


class LuceneFilterError(BaseException):
class LuceneFilterError(Exception):
"""Base class for LuceneFilter related exceptions."""


Expand Down
6 changes: 5 additions & 1 deletion logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, configuration: MetricsConfig):
self.server = None
self.healthcheck_functions = None
self._multiprocessing_prepared = False
self.app = None

def prepare_multiprocessing(self):
"""
Expand Down Expand Up @@ -99,10 +100,12 @@ def run(self, daemon=True):

def init_server(self, daemon=True) -> None:
"""Initializes the server"""
if not self.app:
self.app = make_patched_asgi_app(self.healthcheck_functions)
port = self.configuration.port
self.server = http.ThreadingHTTPServer(
self.configuration.uvicorn_config | {"port": port, "host": "0.0.0.0"},
make_patched_asgi_app(self.healthcheck_functions),
self.app,
daemon=daemon,
logger_name="Exporter",
)
Expand All @@ -116,6 +119,7 @@ def restart(self):
def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None:
"""Updates the healthcheck functions"""
self.healthcheck_functions = healthcheck_functions
self.app = make_patched_asgi_app(self.healthcheck_functions)
if self.server and self.server.thread and self.server.thread.is_alive():
self.server.shut_down()
self.init_server(daemon=daemon)
Expand Down
10 changes: 6 additions & 4 deletions logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
from attrs import define, field, validators
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram

from logprep.util.helper import add_field_to
from logprep.util.helper import add_fields_to


@define(kw_only=True, slots=False)
Expand Down Expand Up @@ -222,12 +222,14 @@ def inner(self, *args, **kwargs): # nosemgrep
if hasattr(self, "rule_type"):
event = args[0]
if event:
add_field_to(event, f"processing_times.{self.rule_type}", duration)
add_fields_to(
event, fields={f"processing_times.{self.rule_type}": duration}
)
if hasattr(self, "_logprep_config"): # attribute of the Pipeline class
event = args[0]
if event:
add_field_to(event, "processing_times.pipeline", duration)
add_field_to(event, "processing_times.hostname", gethostname())
add_fields_to(event, fields={"processing_times.pipeline": duration})
add_fields_to(event, fields={"processing_times.hostname": gethostname()})
return result

return inner
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/amides/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from logprep.processor.amides.features import CommaSeparation


class DetectionModelError(BaseException):
class DetectionModelError(Exception):
"""Base exception class for all RuleModel-related errors."""


Expand Down Expand Up @@ -98,7 +98,7 @@ def detect(self, sample: str) -> Tuple[bool, float]:
return False, round(confidence_value, 3)


class RuleAttributorError(BaseException):
class RuleAttributorError(Exception):
"""Base class for all RuleAttributor-related Errors."""


Expand Down
Loading

0 comments on commit a1430e8

Please sign in to comment.