diff --git a/.gitignore b/.gitignore index 9dd6bb758..2fe9ccc0e 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,6 @@ coverage.xml cov.xml - logs -sql_db_table.json build/ dist/ error_file diff --git a/CHANGELOG.md b/CHANGELOG.md index e82beaadf..4f6a2ba8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * fix `delimiter` typo in `StringSplitterRule` configuration * removed the configuration `tld_lists` in `domain_resolver`, `domain_label_extractor` and `pseudonymizer` as the list is now fixed inside the packaged logprep +* remove SQL feature from `generic_adder`, fields can only be added from rule config or from file ### Features diff --git a/logprep/processor/generic_adder/mysql_connector.py b/logprep/processor/generic_adder/mysql_connector.py deleted file mode 100644 index f425996a8..000000000 --- a/logprep/processor/generic_adder/mysql_connector.py +++ /dev/null @@ -1,186 +0,0 @@ -"""This module is used to connect to a MySQL database and to retrieve data from a SQL table.""" - -import logging -import time -from typing import Optional - -import mysql -import mysql.connector as db - -logger = logging.getLogger("MySQLConnector") - - -class MySQLConnector: - """Used to connect to a MySQL database and to retrieve data from a table if it has changed.""" - - connection: Optional[mysql.connector.MySQLConnection] - - target_column: str - """The name of the column whose values are being matched against a value from an event""" - - _add_target_column: bool - """Determines if the target column itself will be added to the event""" - - table_name: str - """The table name to use when connecting to the MySQL database""" - - _db_check_interval: float - """Time that has to pass for the database to be checked""" - - _last_check: float - """Last time the database has been checked""" - - _last_table_checksum: Optional[int] - """Checksum of the database table that was obtained on the last update check""" - - _cursor: mysql.connector.connection.MySQLCursor - - def __init__(self, sql_config: dict): - """Initialize the MySQLConnector. - - Parameters - ---------- - sql_config : dict - SQL configuration dictionary. - logger : logging.Logger - Logger to use. - - Returns - ------- - bool - True if the SQL table has changed, False otherwise. - - """ - - self.connection = None - self.cursor = None - - self._sql_config = sql_config - - self.target_column = sql_config["target_column"] - self._add_target_column = sql_config.get("add_target_column", False) - - self.table_name = sql_config["table"] - - self._db_check_interval = sql_config.get("timer", 60 * 3) - self._last_check = 0 - self._last_table_checksum = None - - def connect(self): - """Get connection to SQL database and cursor for database table""" - self.connection = db.connect( - user=self._sql_config["user"], - password=self._sql_config["password"], - host=self._sql_config["host"], - database=self._sql_config["database"], - port=self._sql_config.get("port", 3306), - ) - self.cursor = self.connection.cursor() - - def disconnect(self): - """Close connection to SQL database""" - self.connection.close() - - def time_to_check_for_change(self) -> bool: - """Check if enough time has passed to check for a SQL table change. - - Update the timer if it is time to check for a change. - - Returns - ------- - bool - True if a check should be performed, False otherwise. - - """ - check_change = time.time() - self._last_check >= self._db_check_interval - if check_change: - self._last_check = time.time() - return check_change - - def has_changed(self) -> bool: - """Check if a configured SQL table has changed. - - The checksum of the table is used to check if a table has changed. The check is only - performed if a specified time has passed since the last check. - - Returns - ------- - bool - True if the SQL table has changed, False otherwise. - - """ - checksum = self._get_checksum() - if self._last_table_checksum == checksum: - return False - self._last_table_checksum = checksum - return True - - def _get_checksum(self) -> int: - """Get the checksum a configured SQL table. - - The checksum is used to check if a table has changed. - - Returns - ------- - int - The checksum of a SQL table. - - This value changes if the table or it's contents change. - - """ - self.cursor.execute(f"CHECKSUM TABLE {self.table_name}") # nosemgrep - checksum = next(self.cursor)[-1] - self.connection.commit() - return checksum - - def get_data(self) -> dict: - """Get addition data from a configured SQL table. - - Returns - ------- - dict - A dict containing a mapping to rows that can be added by the generic adder. - - The keys of the dict are the values in the SQL table that are being compared to a value - in the event. The values in the dict are lists containing keys and values that can be - added by the generic adder if there is a match. - - """ - self._last_table_checksum = self._get_checksum() - - table = {} - target_col = 0 - - try: - self.cursor.execute(f"desc {self.table_name}") # nosemgrep - col_names = [] - for idx, column_desc in enumerate(self.cursor): - col_names.append(column_desc[0]) - if column_desc[0] == self.target_column: - target_col = idx - - self.cursor.execute(f"SELECT * FROM {self.table_name}") # nosemgrep - - for row_vals in self.cursor: - if self._add_target_column: - column_dict = tuple( - ( - [col_names[idx], col] - for idx, col in enumerate(row_vals) - if col_names[idx].upper() != "ID" - ) - ) - else: - column_dict = tuple( - ( - [col_names[idx], col] - for idx, col in enumerate(row_vals) - if idx != target_col and col_names[idx].upper() != "ID" - ) - ) - table[row_vals[target_col].upper()] = column_dict - - return table - except db.Error as error: - logger.warning(f"Error retrieving entry from database: {error}") - return {} diff --git a/logprep/processor/generic_adder/processor.py b/logprep/processor/generic_adder/processor.py index 2e9677894..6517fd420 100644 --- a/logprep/processor/generic_adder/processor.py +++ b/logprep/processor/generic_adder/processor.py @@ -2,7 +2,7 @@ GenericAdder ============ The `generic_adder` is a processor that adds new fields and values to documents based on a list. -The list can reside inside a rule, inside a file or retrieved from an sql database. +The list resides inside a rule and/or inside a file. Processor Configuration @@ -16,15 +16,6 @@ - tests/testdata/rules/specific/ generic_rules: - tests/testdata/rules/generic/ - sql_config: - user: example_user - password: example_password - host: "127.0.0.1" - database: example_db - table: example_table - target_column: example_column - add_target_column: True - timer: 0.1 .. autoclass:: logprep.processor.generic_adder.processor.GenericAdder.Config :members: @@ -35,221 +26,17 @@ .. automodule:: logprep.processor.generic_adder.rule """ -import json -import os -import re -import time -from typing import Optional - -from attr import define, field, validators -from filelock import FileLock - from logprep.abc.processor import Processor -from logprep.factory_error import InvalidConfigurationError -from logprep.processor.generic_adder.mysql_connector import MySQLConnector from logprep.processor.generic_adder.rule import GenericAdderRule -from logprep.util.helper import add_fields_to, get_dotted_field_value - - -def sql_config_validator(_, attribute, value): - """validate if a subfield of a dict is valid""" - if attribute.name == "sql_config" and isinstance(value, dict): - table = value.get("table") - if table and re.search(r"\s", table): - raise InvalidConfigurationError("Table in 'sql_config' contains whitespaces!") - if table and not re.search(r"^[a-zA-Z0-9_]+$", table): - raise InvalidConfigurationError( - "Table in 'sql_config' may only contain alphanumeric characters and underscores!" - ) +from logprep.util.helper import add_fields_to class GenericAdder(Processor): """Resolve values in documents by referencing a mapping list.""" - @define(kw_only=True) - class Config(Processor.Config): - """GenericAdder config""" - - sql_config: Optional[dict] = field( - default=None, - validator=[ - validators.optional(validator=validators.instance_of(dict)), - sql_config_validator, - ], - ) - """ - Configuration of the connection to a MySQL database and settings on how to add data from - the database. - This field is optional. The database feature will not be used if `sql_config` is omitted. - Has following subfields: - - - `user` - The user to use when connecting to the MySQL database. - - `password` - The password to use when connecting to the MySQL database. - - `host` - The host to use when connecting to the MySQL database. - - `database` - The database name to use when connecting to the MySQL database. - - `table` - The table name to use when connecting to the MySQL database. - - `target_column` - The name of the column whose values are being matched against a value - from an event. - If a value matches, the remaining values of the row with the match are being added to - the event. - - `add_target_column` - Determines if the target column itself will be added to the event. - This is set to false per default. - - `timer` - Period how long to wait (in seconds) before the database table is being checked - for changes. - If there is a change, the table is reloaded by Logprep. - - `file_lock_path` - Path to a file lock used by the adder when updating the SQL table - (default: ./sql_update.lock). - - `db_file_path` - Path to a file used to store the SQL table obtained by the generic adder - (default: ./sql_db_table.json). - - .. security-best-practice:: - :title: Processor - GenericAdder - - When using a sql database to enrich events, ensure that it is a database which is - protected with a user credentials. - """ - rule_class = GenericAdderRule - __slots__ = [ - "_db_connector", - "_db_table", - "_file_lock_path", - "_db_file_path", - "_file_check_interval", - ] - - _db_table: Optional[dict] - """Dict containing table from SQL database""" - - _db_connector: MySQLConnector - """Connector for MySQL database""" - - _file_check_interval: Optional[float] - """Time that has to pass for the database file to become stale""" - - _file_lock_path: Optional[str] - """Path to file lock for database update""" - - _db_file_path: Optional[str] - """Path to file containing table from SQL database""" - - def __init__(self, name: str, configuration: Processor.Config): - """Initialize a generic adder instance. - Performs a basic processor initialization. Furthermore, a SQL database and a SQL table are - being initialized if a SQL configuration exists. - Parameters - ---------- - name : str - Name for the generic adder. - configuration : Processor.Config - Configuration for SQL adding and rule loading. - """ - super().__init__(name, configuration) - - self._db_table = None - sql_config = configuration.sql_config - if sql_config: - self._initialize_sql(sql_config) - - def _initialize_sql(self, sql_config): - self._db_connector = MySQLConnector(sql_config) if sql_config else None - if self._db_connector: - self._file_lock_path = sql_config.get("file_lock_path", "sql_update.lock") - self._db_file_path = sql_config.get("db_file_path", "sql_db_table.json") - self._file_check_interval = sql_config.get("timer", 60 * 3) - - self._check_connection() - self._update_db_table() - - def _check_connection(self): - """Check connections with lock to prevent a sudden spike of connections to the database. - - Checking at initialization prevents error documents caused by a wrong connection - configuration when starting Logprep""" - with FileLock(self._file_lock_path): # pylint: disable=abstract-class-instantiated - self._db_connector.connect() - self._db_connector.disconnect() - - def _update_db_table(self): - if self._db_connector.time_to_check_for_change(): - with FileLock(self._file_lock_path): # pylint: disable=abstract-class-instantiated - now = time.time() - if self._check_if_file_not_exists_or_stale(now): - self._update_from_db_and_write_to_file() - else: - self._load_from_file() - - def _load_from_file(self): - with open(self._db_file_path, "r", encoding="utf8") as db_file: - self._db_table = json.load(db_file) - - def _update_from_db_and_write_to_file(self): - self._db_connector.connect() - try: - if self._db_connector.has_changed(): - self._db_table = self._db_connector.get_data() - with open(self._db_file_path, "w", encoding="utf8") as db_file: - json.dump(self._db_table, db_file) - finally: - self._db_connector.disconnect() - - def _check_if_file_not_exists_or_stale(self, now): - if not os.path.isfile(self._db_file_path): - return True - if now - os.path.getmtime(self._db_file_path) > self._file_check_interval: - return True - return False - def _apply_rules(self, event: dict, rule: GenericAdderRule): - """Apply a matching generic adder rule to the event. - Add fields and values to the event according to the rules it matches for. - Additions can come from the rule definition, from a file or from a SQL table. - The SQL table is initially loaded from the database and then reloaded if it changes. - At first it checks if a SQL table exists and if it will be used. If it does, it adds all - values from a matching row in the table to the event. To determine if a row matches, a - pattern is used on a defined value of the event to extract a subvalue that is then matched - against a value in a defined column of the SQL table. A dotted path prefix can be applied to - add the new fields into a shared nested location. - If no table exists, fields defined withing the rule itself or in a rule file are being added - to the event. - Parameters - ---------- - event : dict - Name of the event to add keys and values to. - rule : GenericAdderRule - A matching generic adder rule. - Raises - ------ - FieldExistsWarning - Raises if an addition would overwrite an existing field or value. - """ items_to_add = rule.add - use_db = rule.db_target and self._db_table - if use_db: - self._update_db_table() - items_to_add = self._get_items_to_add_from_db(event, rule) if items_to_add: add_fields_to(event, items_to_add, rule, rule.extend_target_list, rule.overwrite_target) - - def _get_items_to_add_from_db(self, event: dict, rule: GenericAdderRule) -> dict | None: - """Get the sub part of the value from the event using a regex pattern""" - if not rule.db_pattern: - return - value_to_check_in_db = get_dotted_field_value(event, rule.db_target) - match_with_value_in_db = rule.db_pattern.match(value_to_check_in_db) - if match_with_value_in_db: - # Get values to add from db table using the sub part - value_to_map = match_with_value_in_db.group(1).upper() - add_from_db = self._db_table.get(value_to_map, []) - if rule.db_destination_prefix: - add_from_db = [ - (self._add_prefix_if_not_present(key, rule), value) - for key, value in add_from_db - ] - return dict(add_from_db) - - def _add_prefix_if_not_present(self, key: str, rule: "GenericAdderRule") -> str: - if not key.startswith(rule.db_destination_prefix): - return f"{rule.db_destination_prefix}.{key}" - return key diff --git a/logprep/processor/generic_adder/rule.py b/logprep/processor/generic_adder/rule.py index 6ed9a714f..73844114d 100644 --- a/logprep/processor/generic_adder/rule.py +++ b/logprep/processor/generic_adder/rule.py @@ -71,20 +71,6 @@ - PATH_TO_FILE_WITH_LIST description: '...' -It is also possible to use a table from a MySQL database to add fields to an event. - -.. code-block:: yaml - :linenos: - :caption: Example with a MySQL Table - - filter: '*' - generic_adder: - sql_table: - event_source_field: source - pattern: '([a-zA-Z0-9]+)_\S+' - destination_field_prefix: nested.dict - description: '...' - .. autoclass:: logprep.processor.generic_adder.rule.GenericAdderRule.Config :members: :undoc-members: @@ -93,8 +79,6 @@ """ # pylint: enable=anomalous-backslash-in-string -import re -from typing import Any from attrs import define, field, validators @@ -143,44 +127,6 @@ class Config(FieldManagerRule.Config): """If a list is used, it is possible to tell the generic adder to only use the first existing file by setting :code:`generic_adder.only_first_existing_file: true`. In that case, only one file must exist.""" - sql_table: dict = field( - validator=[ - validators.instance_of(dict), - validators.deep_mapping( - key_validator=validators.in_( - ["pattern", "event_source_field", "destination_field_prefix"] - ), - value_validator=validators.instance_of(str), - ), - ], - factory=dict, - ) - # pylint: disable=anomalous-backslash-in-string - """ sql config for generic adder (Optional) - If a specified field in the table matches a condition, the remaining fields, - except for the ID field, will be added to the event. - The names of the new fields correspond to the column names in the MySQL table. - This is mutually exclusive with the addition from a list. - - It can be defined via :code:`generic_adder.sql_table`. - There :code:`generic_adder.sql_table.event_source_field` defines a field in the event that - is being compared with values in the column of the MySQL table defined - in the processor config. However, only a part of :code:`event_source_field` will - be compared. - Which part this is can be configured via :code:`generic_adder.sql_table.pattern`. - This is a regex pattern with a capture group. - The value in the capture group is being extracted and used for the comparison. - :code:`generic_adder.sql_table.destination_field_prefix` can be used to prefix all added - fields with a dotted path, creating a nested dictionary. - - In the following example the value of the field :code:`source` is being parsed - with :code:`pattern: ([a-zA-Z0-9]+)_\S+`. - It extracts the first alphanumerical string delimited by :code:`_`. - I.e., :code:`Test0_foobarbaz` would extract :code:`test0`, which would be - used for the comparison in the MySQL table. - Since :code:`destination_field_prefix: nested.dict` is set, - a newly added field :code:`FOO_NEW` would be placed under :code:`nested.dict.FOO_NEW`. - """ # pylint: enable=anomalous-backslash-in-string @@ -217,19 +163,3 @@ def _add_from_path(self): def add(self) -> dict: """Returns the fields to add""" return self._config.add - - @property - def db_target(self) -> str: - """Returns the db target""" - return self._config.sql_table.get("event_source_field") - - @property - def db_pattern(self) -> Any: - """Returns the db pattern""" - raw_db_pattern = self._config.sql_table.get("pattern") - return re.compile(raw_db_pattern) if raw_db_pattern else None - - @property - def db_destination_prefix(self) -> str: - """Returns the destination prefix""" - return self._config.sql_table.get("destination_field_prefix", "") diff --git a/pyproject.toml b/pyproject.toml index 89402b270..fd45c4188 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,7 +68,6 @@ dependencies = [ "jsonref", "luqum", "more-itertools==8.10.0", - "mysql-connector-python>=9.1.0", # CVE-2024-21272 "numpy>=1.26.0", "opensearch-py", "prometheus_client", diff --git a/tests/testdata/unit/generic_adder/rules/generic/rules.json b/tests/testdata/unit/generic_adder/rules/generic/rules.json index ef4cf6ee5..7699e942d 100644 --- a/tests/testdata/unit/generic_adder/rules/generic/rules.json +++ b/tests/testdata/unit/generic_adder/rules/generic/rules.json @@ -1,38 +1,39 @@ -[{ - "filter": "add_generic_test", - "generic_adder": {"add": { - "some_added_field": "some value", - "another_added_field": "another_value", - "dotted.added.field": "yet_another_value" - }}, - "description": "" -}, -{ - "filter": "add_list_generic_test", - "generic_adder": {"add_from_file": "tests/testdata/unit/generic_adder/additions_file.yml"}, - "description": "" -}, -{ - "filter": "add_lists_one_generic_test", - "generic_adder": {"add_from_file": ["tests/testdata/unit/generic_adder/additions_file.yml"]}, - "description": "" -}, -{ - "filter": "add_lists_two_generic_test", - "generic_adder": {"add_from_file": [ - "tests/testdata/unit/generic_adder/additions_file.yml", - "tests/testdata/unit/generic_adder/additions_file_2.yml" - ]}, - "description": "" -}, -{ - "filter": "add_from_sql_db_table", - "generic_adder": { - "sql_table": { - "event_source_field": "source", - "pattern": "([a-zA-Z0-9_]+)\\.\\S+", - "destination_field_prefix": "db.test" - } +[ + { + "filter": "add_generic_test", + "generic_adder": { + "add": { + "some_added_field": "some value", + "another_added_field": "another_value", + "dotted.added.field": "yet_another_value" + } + }, + "description": "" }, - "description": "" -}] \ No newline at end of file + { + "filter": "add_list_generic_test", + "generic_adder": { + "add_from_file": "tests/testdata/unit/generic_adder/additions_file.yml" + }, + "description": "" + }, + { + "filter": "add_lists_one_generic_test", + "generic_adder": { + "add_from_file": [ + "tests/testdata/unit/generic_adder/additions_file.yml" + ] + }, + "description": "" + }, + { + "filter": "add_lists_two_generic_test", + "generic_adder": { + "add_from_file": [ + "tests/testdata/unit/generic_adder/additions_file.yml", + "tests/testdata/unit/generic_adder/additions_file_2.yml" + ] + }, + "description": "" + } +] diff --git a/tests/unit/processor/generic_adder/test_generic_adder.py b/tests/unit/processor/generic_adder/test_generic_adder.py index f792741ed..b25bd541d 100644 --- a/tests/unit/processor/generic_adder/test_generic_adder.py +++ b/tests/unit/processor/generic_adder/test_generic_adder.py @@ -2,22 +2,13 @@ # pylint: disable=missing-docstring # pylint: disable=unused-argument # pylint: disable=too-many-arguments -import json -import os import re -import tempfile -import time from copy import deepcopy -from unittest import mock import pytest from logprep.factory import Factory -from logprep.factory_error import InvalidConfigurationError -from logprep.processor.base.exceptions import ( - FieldExistsWarning, - InvalidRuleDefinitionError, -) +from logprep.processor.base.exceptions import InvalidRuleDefinitionError from tests.unit.processor.base import BaseProcessorTestCase RULES_DIR_MISSING = "tests/testdata/unit/generic_adder/rules_missing" @@ -25,67 +16,6 @@ RULES_DIR_FIRST_EXISTING = "tests/testdata/unit/generic_adder/rules_first_existing" -BASE_TABLE_RESULTS = [ - [0, "TEST_0", "foo", "bar"], - [1, "TEST_1", "uuu", "vvv"], - [2, "TEST_2", "123", "456"], -] - - -def mock_simulate_table_change(): - DBMock.Cursor.checksum += 1 - DBMock.Cursor.table_result[0] = [0, "TEST_0", "fi", "fo"] - - -class DBMock(mock.MagicMock): - class Cursor: - table_result = deepcopy(BASE_TABLE_RESULTS) - checksum = 0 - - def __init__(self): - self._data = [] - - def execute(self, statement): - if statement == "CHECKSUM TABLE test_table": - self._data = [self.checksum] - elif statement == "desc test_table": - self._data = [["id"], ["a"], ["b"], ["c"]] - elif statement == "SELECT * FROM test_table": - self._data = self.table_result - else: - self._data = [] - - def mock_clear_all(self): - self.checksum = 0 - self._data = [] - self.table_result = [] - - def __next__(self): - return self._data - - def __iter__(self): - return iter(self._data) - - def cursor(self): - return self.Cursor() - - def commit(self): - pass - - -class DBMockNeverEmpty(DBMock): - class Cursor(DBMock.Cursor): - def execute(self, statement): - if statement.startswith("CHECKSUM TABLE "): - self._data = [self.checksum] - elif statement.startswith("desc "): - self._data = [["id"], ["a"], ["b"], ["c"]] - elif statement.startswith("SELECT * FROM "): - self._data = self.table_result - else: - self._data = [] - - class TestGenericAdder(BaseProcessorTestCase): test_cases = [ # testcase, rule, event, expected ( @@ -392,9 +322,6 @@ def generic_rules_dirs(self): def specific_rules_dirs(self): return self.CONFIG.get("specific_rules") - def test_db_table_is_none(self): - assert self.object._db_table is None - @pytest.mark.parametrize("testcase, rule, event, expected", test_cases) def test_generic_adder_testcases( self, testcase, rule, event, expected @@ -429,390 +356,3 @@ def test_add_generic_fields_from_file_invalid(self): config["generic_rules"] = [RULES_DIR_INVALID] configuration = {"test processor": config} Factory.create(configuration) - - -class BaseTestGenericAdderSQLTestCase(BaseProcessorTestCase): - def setup_method(self): - super().setup_method() - DBMock.Cursor.table_result = deepcopy(BASE_TABLE_RESULTS) - if os.path.isfile(self.object._db_file_path): - os.remove(self.object._db_file_path) - self.object._initialize_sql(self.CONFIG["sql_config"]) - - @property - def generic_rules_dirs(self): - return self.CONFIG.get("generic_rules") - - @property - def specific_rules_dirs(self): - return self.CONFIG.get("specific_rules") - - -class TestGenericAdderProcessorSQLWithoutAddedTarget(BaseTestGenericAdderSQLTestCase): - mocks = {"mysql.connector.connect": {"return_value": DBMock()}} - - CONFIG = { - "type": "generic_adder", - "generic_rules": ["tests/testdata/unit/generic_adder/rules/generic"], - "specific_rules": ["tests/testdata/unit/generic_adder/rules/specific"], - "sql_config": { - "user": "test_user", - "password": "foo_bar_baz", - "host": "127.0.0.1", - "database": "test_db", - "table": "test_table", - "target_column": "a", - "timer": 0.1, - }, - } - - def test_load_from_file(self): - expected_db = { - "TEST_0": (["b", "foo"], ["c", "bar"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - - assert self.object._db_table == expected_db - - _, temp_path = tempfile.mkstemp() - self.object._db_file_path = temp_path - - db_file_content = {"FOO": "BAR"} - with open(temp_path, "w", encoding="utf8") as db_table: - json.dump(db_file_content, db_table) - self.object._load_from_file() - - assert self.object._db_table == db_file_content - - def test_check_if_file_not_stale_after_initialization_of_the_generic_adder(self): - assert not self.object._check_if_file_not_exists_or_stale(time.time()) - - def test_check_if_file_stale_after_enough_time_has_passed(self): - time.sleep(0.2) - assert self.object._check_if_file_not_exists_or_stale(time.time()) - - def test_check_if_file_not_stale_after_enough_time_has_passed_but_file_has_been_changed(self): - time.sleep(0.2) - with open(self.object._db_file_path, "r", encoding="utf-8") as db_file: - file_temp = db_file.read() - now = time.time() - with open(self.object._db_file_path, "w", encoding="utf-8") as db_file: - db_file.write(file_temp) - assert not self.object._check_if_file_not_exists_or_stale(now) - - def test_check_if_file_stale_after_removing_it_when_it_was_not_stale(self): - assert not self.object._check_if_file_not_exists_or_stale(time.time()) - os.remove(self.object._db_file_path) - assert self.object._check_if_file_not_exists_or_stale(time.time()) - - def test_sql_database_enriches_via_table(self): - expected = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"b": "foo", "c": "bar"}}, - } - document = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object.process(document) - - assert document == expected - - def test_sql_database_enriches_via_table_ignore_case(self): - expected = { - "add_from_sql_db_table": "Test", - "source": "test_0.test.123", - "db": {"test": {"b": "foo", "c": "bar"}}, - } - document = {"add_from_sql_db_table": "Test", "source": "test_0.test.123"} - - self.object.process(document) - - assert document == expected - - def test_sql_database_does_not_enrich_via_table_if_value_does_not_exist(self): - expected = {"add_from_sql_db_table": "Test", "source": "TEST_I_DO_NOT_EXIST.test.123"} - document = {"add_from_sql_db_table": "Test", "source": "TEST_I_DO_NOT_EXIST.test.123"} - - self.object.process(document) - - assert document == expected - - def test_sql_database_does_not_enrich_via_table_if_pattern_does_not_match(self): - expected = {"add_from_sql_db_table": "Test", "source": "TEST_0%FOO"} - document = {"add_from_sql_db_table": "Test", "source": "TEST_0%FOO"} - - self.object.process(document) - - assert document == expected - - def test_sql_database_reloads_table_on_change_after_wait(self): - expected_1 = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"b": "foo", "c": "bar"}}, - } - expected_2 = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"b": "fi", "c": "fo"}}, - } - document_1 = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - document_2 = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object.process(document_1) - time.sleep(0.2) - mock_simulate_table_change() - self.object.process(document_2) - - assert document_1 == expected_1 - assert document_2 == expected_2 - - def test_sql_database_with_empty_table_load_after_change(self): - expected = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"b": "fi", "c": "fo"}}, - } - document = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object._db_table = {} - self.object._initialize_sql(self.CONFIG["sql_config"]) - mock_simulate_table_change() - time.sleep(0.2) - self.object.process(document) - - assert document == expected - - def test_sql_database_does_not_reload_table_on_change_if_no_wait(self): - expected = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"b": "foo", "c": "bar"}}, - } - document_1 = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - document_2 = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object.process(document_1) - mock_simulate_table_change() - self.object.process(document_2) - - assert document_1 == expected - assert document_2 == expected - - def test_sql_database_raises_exception_on_duplicate(self, caplog): - expected = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"b": "foo", "c": "bar"}}, - "tags": ["_generic_adder_failure"], - } - document = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object.process(document) - result = self.object.process(document) - assert len(result.warnings) == 1 - assert isinstance(result.warnings[0], FieldExistsWarning) - - assert document == expected - - def test_time_to_check_for_change_not_read_for_change(self): - self.object._file_check_interval = 9999999 - assert self.object._db_connector.time_to_check_for_change() is False - - def test_time_to_check_for_change_read_for_change(self): - time.sleep(self.object._file_check_interval) - assert self.object._db_connector.time_to_check_for_change() is True - - def test_update_from_db_and_write_to_file_change_and_stale(self): - assert os.path.isfile(self.object._db_file_path) - last_file_change = os.path.getmtime(self.object._db_file_path) - mock_simulate_table_change() - time.sleep(self.object._file_check_interval) - self.object._update_from_db_and_write_to_file() - assert self.object._db_table == { - "TEST_0": (["b", "fi"], ["c", "fo"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - assert os.path.getmtime(self.object._db_file_path) > last_file_change - - def test_update_from_db_and_write_to_file_no_change_and_stale(self): - assert os.path.isfile(self.object._db_file_path) - last_file_change = os.path.getmtime(self.object._db_file_path) - time.sleep(self.object._file_check_interval) - self.object._update_from_db_and_write_to_file() - assert self.object._db_table == { - "TEST_0": (["b", "foo"], ["c", "bar"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - assert os.path.getmtime(self.object._db_file_path) == last_file_change - - def test_update_from_db_and_write_to_file_change_and_not_stale(self): - assert os.path.isfile(self.object._db_file_path) - last_file_change = os.path.getmtime(self.object._db_file_path) - self.object._file_check_interval = 9999999 - mock_simulate_table_change() - time.sleep(0.01) - self.object._update_from_db_and_write_to_file() - assert self.object._db_table == { - "TEST_0": (["b", "fi"], ["c", "fo"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - assert os.path.getmtime(self.object._db_file_path) > last_file_change - - def test_update_from_db_and_write_to_file_no_change_and_not_stale(self): - assert os.path.isfile(self.object._db_file_path) - last_file_change = os.path.getmtime(self.object._db_file_path) - self.object._file_check_interval = 9999999 - time.sleep(0.01) - self.object._update_from_db_and_write_to_file() - assert self.object._db_table == { - "TEST_0": (["b", "foo"], ["c", "bar"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - assert os.path.getmtime(self.object._db_file_path) == last_file_change - - def test_update_from_db_and_write_to_file_no_existing_file_stale(self): - assert os.path.isfile(self.object._db_file_path) - os.remove(self.object._db_file_path) - time.sleep(self.object._file_check_interval) - self.object._db_connector._last_table_checksum = None - self.object._update_from_db_and_write_to_file() - assert self.object._db_table == { - "TEST_0": (["b", "foo"], ["c", "bar"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - assert os.path.isfile(self.object._db_file_path) - - def test_update_from_db_and_write_to_file_no_existing_file_not_stale(self): - assert os.path.isfile(self.object._db_file_path) - os.remove(self.object._db_file_path) - self.object._file_check_interval = 9999999 - self.object._db_connector._last_table_checksum = None - self.object._update_from_db_and_write_to_file() - assert self.object._db_table == { - "TEST_0": (["b", "foo"], ["c", "bar"]), - "TEST_1": (["b", "uuu"], ["c", "vvv"]), - "TEST_2": (["b", "123"], ["c", "456"]), - } - assert os.path.isfile(self.object._db_file_path) - - -class TestGenericAdderProcessorSQLWithoutAddedTargetAndTableNeverEmpty( - BaseTestGenericAdderSQLTestCase -): - mocks = {"mysql.connector.connect": {"return_value": DBMockNeverEmpty()}} - - CONFIG = TestGenericAdderProcessorSQLWithoutAddedTarget.CONFIG - - def test_sql_database_no_enrichment_with_empty_table(self): - expected = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - document = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object._db_connector.cursor.mock_clear_all() - self.object._db_table = {} - self.object.process(document) - - assert document == expected - - @pytest.mark.parametrize( - "test_case, table, raised_error", - [ - ( - "valid table name only alpha", - "table", - None, - ), - ( - "valid table name only numeric", - "0", - None, - ), - ( - "valid table name only alphanumeric", - "0a1b", - None, - ), - ( - "valid table name alphanumeric and underscore", - "0a_1b", - None, - ), - ( - "not alphanumeric", - "table!", - ( - InvalidConfigurationError, - "Table in 'sql_config' may only contain " - + "alphanumeric characters and underscores!", - ), - ), - ( - "whitespace", - "tab le", - ( - InvalidConfigurationError, - "Table in 'sql_config' contains whitespaces!", - ), - ), - ( - "not alphanumeric and whitespace", - "tab le!", - ( - InvalidConfigurationError, - "Table in 'sql_config' contains whitespaces!", - ), - ), - ], - ) - def test_sql_table_must_contain_only_alphanumeric_or_underscore( - self, test_case, table, raised_error - ): - config = deepcopy(self.CONFIG) - config["sql_config"]["table"] = table - - if raised_error: - with pytest.raises(raised_error[0], match=raised_error[1]): - Factory.create({"Test Instance Name": config}) - else: - Factory.create({"Test Instance Name": config}) - - -class TestGenericAdderProcessorSQLWithAddedTarget(BaseTestGenericAdderSQLTestCase): - mocks = {"mysql.connector.connect": {"return_value": DBMock()}} - - CONFIG = { - "type": "generic_adder", - "generic_rules": ["tests/testdata/unit/generic_adder/rules/generic"], - "specific_rules": ["tests/testdata/unit/generic_adder/rules/specific"], - "sql_config": { - "user": "test_user", - "password": "foo_bar_baz", - "host": "127.0.0.1", - "database": "test_db", - "table": "test_table", - "target_column": "a", - "add_target_column": True, - "timer": 0.1, - }, - } - - def test_db_table_is_not_none(self): - assert self.object._db_table is not None - - def test_sql_database_adds_target_field(self): - expected = { - "add_from_sql_db_table": "Test", - "source": "TEST_0.test.123", - "db": {"test": {"a": "TEST_0", "b": "foo", "c": "bar"}}, - } - document = {"add_from_sql_db_table": "Test", "source": "TEST_0.test.123"} - - self.object.process(document) - - assert document == expected