From 434216e9d6c31acf53dc01d320801841ca519f22 Mon Sep 17 00:00:00 2001 From: dtrai2 Date: Wed, 13 Nov 2024 08:36:06 +0100 Subject: [PATCH] enable `add_field_to` to always take a batch of fields - Replace `add_batch_to` with `add_field_to` throughout code. - Update helper functions to streamline `add_field_to` usage. --- logprep/abc/input.py | 8 ++--- logprep/abc/processor.py | 2 +- logprep/metrics/metrics.py | 6 ++-- logprep/processor/clusterer/processor.py | 2 +- .../domain_label_extractor/processor.py | 8 ++--- logprep/processor/field_manager/processor.py | 15 ++++---- logprep/processor/generic_adder/processor.py | 4 +-- .../processor/generic_resolver/processor.py | 2 +- logprep/processor/geoip_enricher/processor.py | 4 +-- logprep/processor/grokker/processor.py | 4 +-- .../processor/hyperscan_resolver/processor.py | 2 +- logprep/processor/labeler/processor.py | 6 ++-- .../processor/list_comparison/processor.py | 4 +-- logprep/processor/pseudonymizer/processor.py | 2 +- logprep/processor/requester/processor.py | 6 ++-- .../selective_extractor/processor.py | 4 +-- .../processor/template_replacer/processor.py | 2 +- logprep/util/helper.py | 36 ++++++++++--------- 18 files changed, 60 insertions(+), 57 deletions(-) diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 5aba34714..2e28eabf4 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -18,7 +18,7 @@ from logprep.abc.exceptions import LogprepException from logprep.metrics.metrics import Metric from logprep.processor.base.exceptions import FieldExistsWarning -from logprep.util.helper import add_batch_to, add_field_to, get_dotted_field_value +from logprep.util.helper import add_field_to, get_dotted_field_value from logprep.util.time import UTC, TimeParser from logprep.util.validators import dict_structure_validator @@ -308,7 +308,7 @@ def _add_env_enrichment_to_event(self, event: dict): target: os.environ.get(variable_name, "") for target, variable_name in enrichments.items() } - add_batch_to(event, fields) + add_field_to(event, fields) def _add_arrival_time_information_to_event(self, event: dict): new_field = { @@ -332,13 +332,13 @@ def _add_arrival_timedelta_information_to_event(self, event: dict): TimeParser.from_string(log_arrival_time).astimezone(UTC) - TimeParser.from_string(time_reference).astimezone(UTC) ).total_seconds() - add_field_to(event, field={target_field: delta_time_sec}) + add_field_to(event, fields={target_field: delta_time_sec}) def _add_version_information_to_event(self, event: dict): """Add the version information to the event""" target_field = self._config.preprocessing.get("version_info_target_field") # pylint: disable=protected-access - add_field_to(event, field={target_field: self._config._version_information}) + add_field_to(event, fields={target_field: self._config._version_information}) # pylint: enable=protected-access def _add_hmac_to(self, event_dict, raw_event) -> dict: diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index b13497a18..f6f2a7f10 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -384,7 +384,7 @@ def _has_missing_values(self, event, rule, source_field_dict): def _write_target_field(self, event: dict, rule: "Rule", result: any) -> None: add_field_to( event, - field={rule.target_field: result}, + fields={rule.target_field: result}, extends_lists=rule.extend_target_list, overwrite_target_field=rule.overwrite_target, ) diff --git a/logprep/metrics/metrics.py b/logprep/metrics/metrics.py index 9f1524366..99fdb7702 100644 --- a/logprep/metrics/metrics.py +++ b/logprep/metrics/metrics.py @@ -222,12 +222,12 @@ def inner(self, *args, **kwargs): # nosemgrep if hasattr(self, "rule_type"): event = args[0] if event: - add_field_to(event, field={f"processing_times.{self.rule_type}": duration}) + add_field_to(event, fields={f"processing_times.{self.rule_type}": duration}) if hasattr(self, "_logprep_config"): # attribute of the Pipeline class event = args[0] if event: - add_field_to(event, field={"processing_times.pipeline": duration}) - add_field_to(event, field={"processing_times.hostname": gethostname()}) + add_field_to(event, fields={"processing_times.pipeline": duration}) + add_field_to(event, fields={"processing_times.hostname": gethostname()}) return result return inner diff --git a/logprep/processor/clusterer/processor.py b/logprep/processor/clusterer/processor.py index 80c1f85f3..04ae30014 100644 --- a/logprep/processor/clusterer/processor.py +++ b/logprep/processor/clusterer/processor.py @@ -140,7 +140,7 @@ def _cluster(self, event: dict, rule: ClustererRule): cluster_signature = cluster_signature_based_on_message add_field_to( event, - field={self._config.output_field_name: cluster_signature}, + fields={self._config.output_field_name: cluster_signature}, extends_lists=rule.extend_target_list, overwrite_target_field=rule.overwrite_target, ) diff --git a/logprep/processor/domain_label_extractor/processor.py b/logprep/processor/domain_label_extractor/processor.py index c5cdd11d6..22683e523 100644 --- a/logprep/processor/domain_label_extractor/processor.py +++ b/logprep/processor/domain_label_extractor/processor.py @@ -49,7 +49,7 @@ from logprep.processor.domain_label_extractor.rule import DomainLabelExtractorRule from logprep.processor.field_manager.processor import FieldManager from logprep.util.getter import GetterFactory -from logprep.util.helper import add_and_overwrite, add_batch_to, get_dotted_field_value +from logprep.util.helper import add_and_overwrite, add_field_to, get_dotted_field_value from logprep.util.validators import list_of_urls_validator logger = logging.getLogger("DomainLabelExtractor") @@ -130,7 +130,7 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule): if self._is_valid_ip(domain): tagging_field.append(f"ip_in_{rule.source_fields[0].replace('.', '_')}") - add_and_overwrite(event, field={self._config.tagging_field_name: tagging_field}) + add_and_overwrite(event, fields={self._config.tagging_field_name: tagging_field}) return labels = self._tld_extractor(domain) @@ -140,10 +140,10 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule): f"{rule.target_field}.top_level_domain": labels.suffix, f"{rule.target_field}.subdomain": labels.subdomain, } - add_batch_to(event, fields, overwrite_target_field=rule.overwrite_target) + add_field_to(event, fields, overwrite_target_field=rule.overwrite_target) else: tagging_field.append(f"invalid_domain_in_{rule.source_fields[0].replace('.', '_')}") - add_and_overwrite(event, field={self._config.tagging_field_name: tagging_field}) + add_and_overwrite(event, fields={self._config.tagging_field_name: tagging_field}) @staticmethod def _is_valid_ip(domain): diff --git a/logprep/processor/field_manager/processor.py b/logprep/processor/field_manager/processor.py index 47a00499a..7f93b0ffe 100644 --- a/logprep/processor/field_manager/processor.py +++ b/logprep/processor/field_manager/processor.py @@ -35,7 +35,6 @@ from logprep.processor.field_manager.rule import FieldManagerRule from logprep.util.helper import ( add_and_overwrite, - add_batch_to, add_field_to, get_dotted_field_value, pop_dotted_field_value, @@ -78,7 +77,7 @@ def _apply_mapping(self, event, rule, rule_args): if not any(source_field_values): return source_field_values, targets = self._filter_missing_fields(source_field_values, targets) - add_batch_to( + add_field_to( event, dict(zip(targets, source_field_values)), extend_target_list, overwrite_target ) if rule.delete_source_fields: @@ -106,7 +105,7 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru case State( extend=True, overwrite=True, single_source_element=False, target_is_list=False ): - add_and_overwrite(event, field={target_field: source_fields_values}) + add_and_overwrite(event, fields={target_field: source_fields_values}) return case State( @@ -118,16 +117,16 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru ): flattened_source_fields = self._overwrite_from_source_values(source_fields_values) source_fields_values = [*flattened_source_fields] - add_and_overwrite(event, field={target_field: source_fields_values}) + add_and_overwrite(event, fields={target_field: source_fields_values}) return case State(extend=True, overwrite=False, target_is_list=False, target_is_none=True): - add_and_overwrite(event, field={target_field: source_fields_values}) + add_and_overwrite(event, fields={target_field: source_fields_values}) return case State(extend=True, overwrite=False, target_is_list=False): source_fields_values = [target_field_value, *source_fields_values] - add_and_overwrite(event, field={target_field: source_fields_values}) + add_and_overwrite(event, fields={target_field: source_fields_values}) return case State( @@ -135,13 +134,13 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru ): flattened_source_fields = self._overwrite_from_source_values(source_fields_values) source_fields_values = [*target_field_value, *flattened_source_fields] - add_and_overwrite(event, field={target_field: source_fields_values}) + add_and_overwrite(event, fields={target_field: source_fields_values}) return case State(overwrite=True, extend=True): flattened_source_fields = self._overwrite_from_source_values(source_fields_values) source_fields_values = [*flattened_source_fields] - add_and_overwrite(event, field={target_field: source_fields_values}) + add_and_overwrite(event, fields={target_field: source_fields_values}) return case _: diff --git a/logprep/processor/generic_adder/processor.py b/logprep/processor/generic_adder/processor.py index 8f9b2b37c..0dd8a0574 100644 --- a/logprep/processor/generic_adder/processor.py +++ b/logprep/processor/generic_adder/processor.py @@ -48,7 +48,7 @@ from logprep.factory_error import InvalidConfigurationError from logprep.processor.generic_adder.mysql_connector import MySQLConnector from logprep.processor.generic_adder.rule import GenericAdderRule -from logprep.util.helper import add_batch_to, get_dotted_field_value +from logprep.util.helper import add_field_to, get_dotted_field_value def sql_config_validator(_, attribute, value): @@ -230,7 +230,7 @@ def _apply_rules(self, event: dict, rule: GenericAdderRule): self._update_db_table() items_to_add = self._get_items_to_add_from_db(event, rule) if items_to_add: - add_batch_to(event, items_to_add, rule.extend_target_list, rule.overwrite_target) + add_field_to(event, items_to_add, rule.extend_target_list, rule.overwrite_target) def _get_items_to_add_from_db(self, event: dict, rule: GenericAdderRule) -> dict | None: """Get the sub part of the value from the event using a regex pattern""" diff --git a/logprep/processor/generic_resolver/processor.py b/logprep/processor/generic_resolver/processor.py index fce8b2373..aee580b25 100644 --- a/logprep/processor/generic_resolver/processor.py +++ b/logprep/processor/generic_resolver/processor.py @@ -61,7 +61,7 @@ def _apply_rules(self, event, rule): try: add_field_to( event, - field={target_field: content}, + fields={target_field: content}, extends_lists=rule.extend_target_list, overwrite_target_field=rule.overwrite_target, ) diff --git a/logprep/processor/geoip_enricher/processor.py b/logprep/processor/geoip_enricher/processor.py index 9374b8b45..c3cad0745 100644 --- a/logprep/processor/geoip_enricher/processor.py +++ b/logprep/processor/geoip_enricher/processor.py @@ -41,7 +41,7 @@ from logprep.processor.field_manager.processor import FieldManager from logprep.processor.geoip_enricher.rule import GEOIP_DATA_STUBS, GeoipEnricherRule from logprep.util.getter import GetterFactory -from logprep.util.helper import add_batch_to, get_dotted_field_value +from logprep.util.helper import add_field_to, get_dotted_field_value logger = logging.getLogger("GeoipEnricher") @@ -132,7 +132,7 @@ def _apply_rules(self, event, rule): rule.customize_target_subfields.get(target, f"{rule.target_field}.{target}"): value for target, value in geoip_data.items() } - add_batch_to( + add_field_to( event, fields, extends_lists=False, diff --git a/logprep/processor/grokker/processor.py b/logprep/processor/grokker/processor.py index dac33017e..b2c830367 100644 --- a/logprep/processor/grokker/processor.py +++ b/logprep/processor/grokker/processor.py @@ -42,7 +42,7 @@ from logprep.processor.field_manager.processor import FieldManager from logprep.processor.grokker.rule import GrokkerRule from logprep.util.getter import GetterFactory -from logprep.util.helper import add_batch_to, get_dotted_field_value +from logprep.util.helper import add_field_to, get_dotted_field_value logger = logging.getLogger("Grokker") @@ -85,7 +85,7 @@ def _apply_rules(self, event: dict, rule: GrokkerRule): if result is None or result == {}: continue matches.append(True) - add_batch_to( + add_field_to( event, result, extends_lists=rule.extend_target_list, diff --git a/logprep/processor/hyperscan_resolver/processor.py b/logprep/processor/hyperscan_resolver/processor.py index 49a05b92c..81a4b89ee 100644 --- a/logprep/processor/hyperscan_resolver/processor.py +++ b/logprep/processor/hyperscan_resolver/processor.py @@ -121,7 +121,7 @@ def _apply_rules(self, event: dict, rule: HyperscanResolverRule): try: add_field_to( event, - field={resolve_target: dest_val}, + fields={resolve_target: dest_val}, extends_lists=rule.extend_target_list, overwrite_target_field=rule.overwrite_target, ) diff --git a/logprep/processor/labeler/processor.py b/logprep/processor/labeler/processor.py index 2c878e4d6..6b2d47a6f 100644 --- a/logprep/processor/labeler/processor.py +++ b/logprep/processor/labeler/processor.py @@ -33,7 +33,7 @@ from logprep.abc.processor import Processor from logprep.processor.labeler.labeling_schema import LabelingSchema from logprep.processor.labeler.rule import LabelerRule -from logprep.util.helper import add_batch_to, get_dotted_field_value +from logprep.util.helper import add_field_to, get_dotted_field_value class Labeler(Processor): @@ -74,10 +74,10 @@ def setup(self): def _apply_rules(self, event, rule): """Applies the rule to the current event""" fields = {key: value for key, value in rule.prefixed_label.items()} - add_batch_to(event, fields, extends_lists=True) + add_field_to(event, fields, extends_lists=True) # convert sets into sorted lists fields = { key: sorted(set(get_dotted_field_value(event, key))) for key, _ in rule.prefixed_label.items() } - add_batch_to(event, fields, overwrite_target_field=True) + add_field_to(event, fields, overwrite_target_field=True) diff --git a/logprep/processor/list_comparison/processor.py b/logprep/processor/list_comparison/processor.py index ae488f956..4d1adc2da 100644 --- a/logprep/processor/list_comparison/processor.py +++ b/logprep/processor/list_comparison/processor.py @@ -73,8 +73,8 @@ def _apply_rules(self, event, rule): """ comparison_result, comparison_key = self._list_comparison(rule, event) if comparison_result is not None: - field = {f"{rule.target_field}.{comparison_key}": comparison_result} - add_field_to(event, field, extends_lists=True) + fields = {f"{rule.target_field}.{comparison_key}": comparison_result} + add_field_to(event, fields, extends_lists=True) def _list_comparison(self, rule: ListComparisonRule, event: dict): """ diff --git a/logprep/processor/pseudonymizer/processor.py b/logprep/processor/pseudonymizer/processor.py index 78b13dccb..b324e8134 100644 --- a/logprep/processor/pseudonymizer/processor.py +++ b/logprep/processor/pseudonymizer/processor.py @@ -264,7 +264,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule): ] else: field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value) - add_field_to(event, field={dotted_field: field_value}, overwrite_target_field=True) + add_field_to(event, fields={dotted_field: field_value}, overwrite_target_field=True) if "@timestamp" in event: for pseudonym, _ in self.result.data: pseudonym["@timestamp"] = event["@timestamp"] diff --git a/logprep/processor/requester/processor.py b/logprep/processor/requester/processor.py index 8de56e3d3..dc56a8d8c 100644 --- a/logprep/processor/requester/processor.py +++ b/logprep/processor/requester/processor.py @@ -44,7 +44,7 @@ from logprep.processor.base.exceptions import FieldExistsWarning from logprep.processor.field_manager.processor import FieldManager from logprep.processor.requester.rule import RequesterRule -from logprep.util.helper import add_batch_to, add_field_to, get_source_fields_dict +from logprep.util.helper import add_field_to, get_source_fields_dict TEMPLATE_KWARGS = ("url", "json", "data", "params") @@ -72,7 +72,7 @@ def _handle_response(self, event, rule, response): try: add_field_to( event, - field={rule.target_field: self._get_result(response)}, + fields={rule.target_field: self._get_result(response)}, extends_lists=rule.extend_target_list, overwrite_target_field=rule.overwrite_target, ) @@ -83,7 +83,7 @@ def _handle_response(self, event, rule, response): contents = self._get_field_values(self._get_result(response), source_fields) targets = rule.target_field_mapping.values() try: - add_batch_to( + add_field_to( event, dict(zip(targets, contents)), rule.extend_target_list, diff --git a/logprep/processor/selective_extractor/processor.py b/logprep/processor/selective_extractor/processor.py index b0b7e58dc..fee75a67f 100644 --- a/logprep/processor/selective_extractor/processor.py +++ b/logprep/processor/selective_extractor/processor.py @@ -31,7 +31,7 @@ from logprep.processor.field_manager.processor import FieldManager from logprep.processor.selective_extractor.rule import SelectiveExtractorRule -from logprep.util.helper import add_batch_to, get_source_fields_dict +from logprep.util.helper import add_field_to, get_source_fields_dict class SelectiveExtractor(FieldManager): @@ -64,5 +64,5 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule): } if flattened_fields: filtered_event = {} - add_batch_to(filtered_event, flattened_fields) + add_field_to(filtered_event, flattened_fields) self.result.data.append((filtered_event, rule.outputs)) diff --git a/logprep/processor/template_replacer/processor.py b/logprep/processor/template_replacer/processor.py index 0a4fdd725..9b17fbcfa 100644 --- a/logprep/processor/template_replacer/processor.py +++ b/logprep/processor/template_replacer/processor.py @@ -115,7 +115,7 @@ def _perform_replacement(self, event: dict, replacement: str, rule: TemplateRepl """ overwrite = get_dotted_field_value(event, self._target_field) is not None add_field_to( - event, field={self._target_field: replacement}, overwrite_target_field=overwrite + event, fields={self._target_field: replacement}, overwrite_target_field=overwrite ) def setup(self): diff --git a/logprep/util/helper.py b/logprep/util/helper.py index f4fafda6b..26ad4178c 100644 --- a/logprep/util/helper.py +++ b/logprep/util/helper.py @@ -59,9 +59,9 @@ def _add_and_not_overwrite_key(sub_dict, key): return sub_dict.get(key) -def add_field_to( +def _add_one_field_to( event: dict, - field: dict, + field: tuple, extends_lists: bool = False, overwrite_target_field: bool = False, ) -> None: @@ -72,7 +72,7 @@ def add_field_to( ---------- event: dict Original log-event that logprep is currently processing - field: dict + field: tuple A key value pair describing the field that should be added. The key is the dotted subfield string indicating the target. The value is the content that should be added to the named target. The content can be of type str, float, int, list, dict. @@ -90,8 +90,6 @@ def add_field_to( """ if extends_lists and overwrite_target_field: raise ValueError("An output field can't be overwritten and extended at the same time") - if isinstance(field, dict): - field = list(field.items())[0] target_field, content = field field_path = [event, *get_dotted_field_list(target_field)] target_key = field_path.pop() @@ -116,7 +114,7 @@ def add_field_to( target_parent[target_key].append(content) -def _add_field_to_silent_fail(*args, **kwargs) -> None | str: +def _add_one_field_to_silent_fail(*args, **kwargs) -> None | str: """ Adds a field to an object, ignoring the FieldExistsWarning if the field already exists. Is only needed in the add_batch_to map function. Without this the map would terminate early. @@ -134,22 +132,24 @@ def _add_field_to_silent_fail(*args, **kwargs) -> None | str: FieldExistsWarning: If the field already exists, but this warning is caught and ignored. """ try: - add_field_to(*args, **kwargs) + _add_one_field_to(*args, **kwargs) except FieldExistsWarning as error: return error.skipped_fields[0] -def add_batch_to(event, fields, extends_lists=False, overwrite_target_field=False) -> None: +def add_field_to( + event: dict, fields: dict, extends_lists: bool = False, overwrite_target_field: bool = False +) -> None: """ Handles the batch addition operation while raising a FieldExistsWarning with all unsuccessful targets. Parameters: event: dict The event object to which fields are to be added. - targets: list - A list of target field names where the contents will be added. - contents: list - A list of contents corresponding to each target field. + fields: dict + A dicht with key value pairs describing the fields that should be added. The key is the dotted subfield + string indicating the target. The value is the content that should be added to the named target. The + content can be of type: str, float, int, list, dict. extends_lists: bool A boolean indicating whether to extend lists if the target field already exists. overwrite_target_field: bool @@ -159,10 +159,14 @@ def add_batch_to(event, fields, extends_lists=False, overwrite_target_field=Fals FieldExistsWarning: If there are targets to which the content could not be added due to field existence restrictions. """ + # filter out None values fields = {key: value for key, value in fields.items() if value is not None} number_fields = len(dict(fields)) + if number_fields == 1: + _add_one_field_to(event, list(fields.items())[0], extends_lists, overwrite_target_field) + return unsuccessful_targets = map( - _add_field_to_silent_fail, + _add_one_field_to_silent_fail, itertools.repeat(event, number_fields), fields.items(), itertools.repeat(extends_lists, number_fields), @@ -342,9 +346,9 @@ def snake_to_camel(snake: str) -> str: append_as_list = partial(add_field_to, extends_lists=True) -def add_and_overwrite(event, field, *_): +def add_and_overwrite(event, fields, *_): """wrapper for add_field_to""" - add_field_to(event, field, overwrite_target_field=True) + add_field_to(event, fields, overwrite_target_field=True) def append(event, field, separator): @@ -354,7 +358,7 @@ def append(event, field, separator): if not isinstance(target_value, list): target_value = "" if target_value is None else target_value target_value = f"{target_value}{separator}{content}" - add_and_overwrite(event, field={target_field: target_value}) + add_and_overwrite(event, fields={target_field: target_value}) else: append_as_list(event, field)