diff --git a/.vscode/settings.json b/.vscode/settings.json index 4681437a4..3d4b83d9b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,7 @@ "python.testing.pytestEnabled": true, "python.testing.unittestEnabled": false, "python.analysis.importFormat": "absolute", + "python.testing.pytestPath": "pytest", "editor.formatOnSave": true, "editor.rulers": [ 80, @@ -9,7 +10,9 @@ 120 ], "editor.codeActionsOnSave": { - "source.organizeImports": "explicit" + "source.organizeImports": "explicit", + "source.unusedImports": "always", + "source.convertImportFormat": "always" }, "autoDocstring.docstringFormat": "numpy", "files.exclude": { diff --git a/logprep/metrics/exporter.py b/logprep/metrics/exporter.py index 411a47c7e..69c38aa04 100644 --- a/logprep/metrics/exporter.py +++ b/logprep/metrics/exporter.py @@ -52,6 +52,7 @@ def __init__(self, configuration: MetricsConfig): self.server = None self.healthcheck_functions = None self._multiprocessing_prepared = False + self.app = None def prepare_multiprocessing(self): """ @@ -99,10 +100,12 @@ def run(self, daemon=True): def init_server(self, daemon=True) -> None: """Initializes the server""" + if not self.app: + self.app = make_patched_asgi_app(self.healthcheck_functions) port = self.configuration.port self.server = http.ThreadingHTTPServer( self.configuration.uvicorn_config | {"port": port, "host": "0.0.0.0"}, - make_patched_asgi_app(self.healthcheck_functions), + self.app, daemon=daemon, logger_name="Exporter", ) @@ -116,6 +119,7 @@ def restart(self): def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None: """Updates the healthcheck functions""" self.healthcheck_functions = healthcheck_functions + self.app = make_patched_asgi_app(self.healthcheck_functions) if self.server and self.server.thread and self.server.thread.is_alive(): self.server.shut_down() self.init_server(daemon=daemon) diff --git a/pyproject.toml b/pyproject.toml index 620a42f5a..103fa1ed7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,10 +56,10 @@ keywords = [ "logdata", ] dependencies = [ - "aiohttp>=3.9.2", # CVE-2024-23334 + "aiohttp>=3.9.2", # CVE-2024-23334 "attrs", - "certifi>=2023.7.22", # CVE-2023-37920 - "ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11 + "certifi>=2023.7.22", # CVE-2023-37920 + "ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11 "colorama", "confluent-kafka>2", "geoip2", @@ -67,7 +67,7 @@ dependencies = [ "jsonref", "luqum", "more-itertools==8.10.0", - "mysql-connector-python>=9.1.0", # CVE-2024-21272 + "mysql-connector-python>=9.1.0", # CVE-2024-21272 "numpy>=1.26.0", "opensearch-py", "prometheus_client", @@ -84,7 +84,7 @@ dependencies = [ "schedule", "tldextract", "urlextract", - "urllib3>=1.26.17", # CVE-2023-43804 + "urllib3>=1.26.17", # CVE-2023-43804 "uvicorn", "deepdiff", "msgspec", @@ -113,7 +113,9 @@ dev = [ "jinja2", "maturin", "cibuildwheel", - "pre-commit" + "asgiref", + "pytest-asyncio", + "pre-commit", ] doc = [ diff --git a/tests/acceptance/test_full_configuration.py b/tests/acceptance/test_full_configuration.py index 5ccf14a09..aee28e09f 100644 --- a/tests/acceptance/test_full_configuration.py +++ b/tests/acceptance/test_full_configuration.py @@ -1,4 +1,6 @@ # pylint: disable=missing-docstring +# pylint: disable=line-too-long +# pylint: disable=too-many-locals import os import re import tempfile @@ -115,7 +117,7 @@ def test_start_of_logprep_from_http_with_templated_url_and_config(): output = proc.stdout.readline().decode("utf8") -def test_logprep_exposes_prometheus_metrics(tmp_path): +def test_logprep_exposes_prometheus_metrics_and_healthchecks(tmp_path): temp_dir = tempfile.gettempdir() input_file_path = Path(os.path.join(temp_dir, "input.txt")) input_file_path.touch() @@ -246,4 +248,9 @@ def test_logprep_exposes_prometheus_metrics(tmp_path): len(re.findall(both_calculators, metrics)) == 4 ), "More or less than 4 rules were found for both calculator" + # check health endpoint + response = requests.get("http://127.0.0.1:8003/health", timeout=7) + response.raise_for_status() + assert "OK" == response.text + proc.kill() diff --git a/tests/unit/metrics/test_exporter.py b/tests/unit/metrics/test_exporter.py index 4ff498888..cb0a5f455 100644 --- a/tests/unit/metrics/test_exporter.py +++ b/tests/unit/metrics/test_exporter.py @@ -2,14 +2,15 @@ # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init # pylint: disable=line-too-long +import asyncio import os.path from unittest import mock import pytest -import requests -from prometheus_client import REGISTRY +from asgiref.testing import ApplicationCommunicator +from prometheus_client import CollectorRegistry -from logprep.metrics.exporter import PrometheusExporter +from logprep.metrics.exporter import PrometheusExporter, make_patched_asgi_app from logprep.util import http from logprep.util.configuration import MetricsConfig @@ -20,7 +21,6 @@ ) class TestPrometheusExporter: def setup_method(self): - REGISTRY.__init__() self.metrics_config = MetricsConfig(enabled=True, port=8000) def test_correct_setup(self): @@ -106,80 +106,103 @@ def test_is_running_returns_true_when_server_thread_is_alive(self): assert exporter.is_running +@mock.patch( + "logprep.util.http.ThreadingHTTPServer", new=mock.create_autospec(http.ThreadingHTTPServer) +) @mock.patch( "logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing", new=lambda *args, **kwargs: None, ) class TestHealthEndpoint: + """These tests uses the `asgiref.testing.ApplicationCommunicator` to test the ASGI app itself + For more information see: https://dokk.org/documentation/django-channels/2.4.0/topics/testing/ + """ + def setup_method(self): - REGISTRY.__init__() self.metrics_config = MetricsConfig(enabled=True, port=8000) + self.registry = CollectorRegistry() + self.captured_status = None + self.captured_headers = None + # Setup ASGI scope + self.scope = { + "client": ("127.0.0.1", 32767), + "headers": [], + "http_version": "1.0", + "method": "GET", + "path": "/", + "query_string": b"", + "scheme": "http", + "server": ("127.0.0.1", 80), + "type": "http", + } + self.communicator = None + + def teardown_method(self): + if self.communicator: + asyncio.get_event_loop().run_until_complete(self.communicator.wait()) + + def seed_app(self, app): + self.communicator = ApplicationCommunicator(app, self.scope) - def test_health_endpoint_returns_503_as_default_health_state(self): - exporter = PrometheusExporter(self.metrics_config) - exporter.run(daemon=False) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == 503 - exporter.server.shut_down() - - def test_health_endpoint_calls_health_check_functions(self): + @pytest.mark.parametrize( + "functions, expected_status, expected_body", + [ + ([lambda: True], 200, b"OK"), + ([lambda: True, lambda: True], 200, b"OK"), + ([lambda: False], 503, b"FAIL"), + ([lambda: False, lambda: False], 503, b"FAIL"), + ([lambda: False, lambda: True, lambda: True], 503, b"FAIL"), + ], + ) + @pytest.mark.asyncio + async def test_asgi_app(self, functions, expected_status, expected_body): + app = make_patched_asgi_app(functions) + self.scope["path"] = "/health" + self.seed_app(app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == expected_status + event = await self.communicator.receive_output(timeout=1) + assert expected_body in event["body"] + + @pytest.mark.asyncio + async def test_health_endpoint_calls_health_check_functions(self): exporter = PrometheusExporter(self.metrics_config) function_mock = mock.Mock(return_value=True) exporter.healthcheck_functions = [function_mock] exporter.run(daemon=False) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == 200 - assert function_mock.call_count == 1 - - exporter.server.shut_down() - - def test_health_endpoint_calls_updated_functions(self): + self.scope["path"] = "/health" + self.seed_app(exporter.app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == 200 + event = await self.communicator.receive_output(timeout=1) + assert b"OK" in event["body"] + function_mock.assert_called_once() + + @pytest.mark.asyncio + async def test_update_health_checks_injects_new_functions(self): exporter = PrometheusExporter(self.metrics_config) function_mock = mock.Mock(return_value=True) exporter.healthcheck_functions = [function_mock] exporter.run(daemon=False) - requests.get("http://localhost:8000/health", timeout=0.5) + exporter.server.thread = None + self.scope["path"] = "/health" + self.seed_app(exporter.app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == 200 + event = await self.communicator.receive_output(timeout=1) + assert b"OK" in event["body"] assert function_mock.call_count == 1, "initial function should be called" new_function_mock = mock.Mock(return_value=True) exporter.update_healthchecks([new_function_mock]) - requests.get("http://localhost:8000/health", timeout=0.5) + self.scope["path"] = "/health" + self.seed_app(exporter.app) + await self.communicator.send_input({"type": "http.request"}) + event = await self.communicator.receive_output(timeout=1) + assert event["status"] == 200 + event = await self.communicator.receive_output(timeout=1) + assert b"OK" in event["body"] assert new_function_mock.call_count == 1, "New function should be called" assert function_mock.call_count == 1, "Old function should not be called" - - exporter.server.shut_down() - - @pytest.mark.parametrize( - "functions, expected", - [ - ([lambda: True], 200), - ([lambda: True, lambda: True], 200), - ([lambda: False], 503), - ([lambda: False, lambda: False], 503), - ([lambda: False, lambda: True, lambda: True], 503), - ], - ) - def test_health_check_returns_status_code(self, functions, expected): - exporter = PrometheusExporter(self.metrics_config) - exporter.run(daemon=False) - exporter.update_healthchecks(functions) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.status_code == expected - exporter.server.shut_down() - - @pytest.mark.parametrize( - "functions, expected", - [ - ([lambda: True], "OK"), - ([lambda: True, lambda: True], "OK"), - ([lambda: False], "FAIL"), - ([lambda: False, lambda: False], "FAIL"), - ([lambda: False, lambda: True, lambda: True], "FAIL"), - ], - ) - def test_health_check_returns_body(self, functions, expected): - exporter = PrometheusExporter(self.metrics_config) - exporter.run(daemon=False) - exporter.update_healthchecks(functions) - resp = requests.get("http://localhost:8000/health", timeout=0.5) - assert resp.content.decode() == expected - exporter.server.shut_down()