From 96f9a26a178654b0578ed62a5207e72e3da9de3a Mon Sep 17 00:00:00 2001 From: dtrai2 <95028228+dtrai2@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:38:37 +0100 Subject: [PATCH] Improve test_http_input execution speed (#720) * rename test_http_input acceptance test * improve test speed by mocking the threaded http server * remove `simulate` from client calls --- logprep/connector/file/input.py | 1 - logprep/connector/http/input.py | 27 ++-- ...ut.py => test_http_input_with_requests.py} | 0 tests/unit/connector/test_http_input.py | 143 ++++++++---------- 4 files changed, 79 insertions(+), 92 deletions(-) rename tests/acceptance/{test_http_input.py => test_http_input_with_requests.py} (100%) diff --git a/logprep/connector/file/input.py b/logprep/connector/file/input.py index 3c5a74897..1d6e439de 100644 --- a/logprep/connector/file/input.py +++ b/logprep/connector/file/input.py @@ -23,7 +23,6 @@ import queue import threading import zlib -from logging import Logger from typing import Callable, TextIO from attrs import define, field, validators diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index 57436d58b..2f97f8d9a 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -23,10 +23,10 @@ host: 0.0.0.0 port: 9000 endpoints: - /firstendpoint: json + /firstendpoint: json /second*: plaintext /(third|fourth)/endpoint: jsonl - + The endpoint config supports regex and wildcard patterns: * :code:`/second*`: matches everything after asterisk * :code:`/(third|fourth)/endpoint` matches either third or forth in the first part @@ -38,7 +38,7 @@ add basic authentication for a specific endpoint. The format of this file would look like: .. code-block:: yaml - :caption: Example for credentials file + :caption: Example for credentials file :linenos: input: @@ -49,7 +49,7 @@ /second*: username: user password: secret_password - + You can choose between a plain secret with the key :code:`password` or a filebased secret with the key :code:`password_file`. @@ -60,20 +60,20 @@ - basic auth must only be used with strong passwords - basic auth must only be used with TLS encryption - avoid to reveal your plaintext secrets in public repositories - + Behaviour of HTTP Requests ^^^^^^^^^^^^^^^^^^^^^^^^^^ * :code:`GET`: - + * Responds always with 200 (ignores configured Basic Auth) * When Messages Queue is full, it responds with 429 * :code:`POST`: - + * Responds with 200 on non-Basic Auth Endpoints * Responds with 401 on Basic Auth Endpoints (and 200 with appropriate credentials) * When Messages Queue is full, it responds wiht 429 * :code:`ALL OTHER`: - + * Responds with 405 """ @@ -353,7 +353,7 @@ class Config(Input.Config): :title: Uvicorn Webserver Configuration :location: uvicorn_config :suggested-value: uvicorn_config.access_log: true, uvicorn_config.server_header: false, uvicorn_config.data_header: false - + Additionally to the below it is recommended to configure `ssl on the metrics server endpoint `_ @@ -378,8 +378,8 @@ class Config(Input.Config): """Configure endpoint routes with a Mapping of a path to an endpoint. Possible endpoints are: :code:`json`, :code:`jsonl`, :code:`plaintext`. It's possible to use wildcards and regexps for pattern matching. - - + + .. autoclass:: logprep.connector.http.input.PlaintextHttpEndpoint :noindex: .. autoclass:: logprep.connector.http.input.JSONLHttpEndpoint @@ -430,6 +430,7 @@ def __init__(self, name: str, configuration: "HttpInput.Config") -> None: ) schema = "https" if ssl_options else "http" self.target = f"{schema}://{host}:{port}" + self.app = None self.http_server = None def setup(self): @@ -462,9 +463,9 @@ def setup(self): self.metrics, ) - app = self._get_asgi_app(endpoints_config) + self.app = self._get_asgi_app(endpoints_config) self.http_server = http.ThreadingHTTPServer( - self._config.uvicorn_config, app, daemon=False, logger_name="HTTPServer" + self._config.uvicorn_config, self.app, daemon=False, logger_name="HTTPServer" ) self.http_server.start() diff --git a/tests/acceptance/test_http_input.py b/tests/acceptance/test_http_input_with_requests.py similarity index 100% rename from tests/acceptance/test_http_input.py rename to tests/acceptance/test_http_input_with_requests.py diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index 66828c08f..e9ea6f5e4 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -13,10 +13,8 @@ import pytest import requests import responses -import uvicorn -from requests.auth import HTTPBasicAuth - from falcon import testing +from requests.auth import _basic_auth_str from logprep.abc.input import FatalInputError from logprep.connector.http.input import HttpInput @@ -50,19 +48,9 @@ def create_credentials(tmp_path): return str(credential_file_path) +@mock.patch("logprep.connector.http.input.http.ThreadingHTTPServer", new=mock.MagicMock()) class TestHttpConnector(BaseInputTestCase): - def setup_method(self): - HttpInput.messages = ThrottlingQueue( - ctx=multiprocessing.get_context(), maxsize=self.CONFIG.get("message_backlog_size") - ) - super().setup_method() - self.object.pipeline_index = 1 - self.object.setup() - self.app = self.object.http_server.server.config.app - self.target = self.object.target - self.client = testing.TestClient(self.app) - CONFIG: dict = { "type": "http_input", "message_backlog_size": 100, @@ -88,6 +76,19 @@ def setup_method(self): "logprep_number_of_http_requests", ] + def setup_method(self): + HttpInput.messages = ThrottlingQueue( + ctx=multiprocessing.get_context(), maxsize=self.CONFIG.get("message_backlog_size") + ) + super().setup_method() + self.object.pipeline_index = 1 + with mock.patch( + "logprep.connector.http.input.http.ThreadingHTTPServer", new=mock.MagicMock() + ): + self.object.setup() + self.target = self.object.target + self.client = testing.TestClient(self.object.app) + def teardown_method(self): while not self.object.messages.empty(): self.object.messages.get(timeout=0.001) @@ -113,43 +114,43 @@ def test_not_first_pipeline(self): assert connector.http_server is None def test_get_method_returns_200(self): - resp = self.client.simulate_get("/json") + resp = self.client.get("/json") assert resp.status_code == 200 def test_get_method_returns_200_with_authentication(self): - resp = self.client.simulate_get("/auth-json-secret") + resp = self.client.get("/auth-json-secret") assert resp.status_code == 200 def test_get_method_returns_429_if_queue_is_full(self): self.object.messages.full = mock.MagicMock() self.object.messages.full.return_value = True - resp = self.client.simulate_get("/json") + resp = self.client.get("/json") assert resp.status_code == 429 def test_get_error_code_too_many_requests(self): data = {"message": "my log message"} self.object.messages.put = mock.MagicMock() self.object.messages.put.side_effect = queue.Full() - resp = self.client.simulate_post("/json", json=data) + resp = self.client.post("/json", json=data) assert resp.status_code == 429 def test_json_endpoint_accepts_post_request(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/json", json=data) + resp = self.client.post("/json", json=data) assert resp.status_code == 200 def test_json_endpoint_match_wildcard_route(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/json", json=data) + resp = self.client.post("/json", json=data) assert resp.status_code == 200 def test_json_endpoint_not_match_wildcard_route(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/api/wildcard_path/json/another_path", json=data) + resp = self.client.post("/api/wildcard_path/json/another_path", json=data) assert resp.status_code == 404 data = {"message": "my log message"} - resp = self.client.simulate_post("/json", json=data) + resp = self.client.post("/json", json=data) assert resp.status_code == 200 event_from_queue = self.object.messages.get(timeout=0.001) @@ -157,39 +158,39 @@ def test_json_endpoint_not_match_wildcard_route(self): def test_plaintext_endpoint_accepts_post_request(self): data = "my log message" - resp = self.client.simulate_post("/plaintext", json=data) + resp = self.client.post("/plaintext", json=data) assert resp.status_code == 200 def test_plaintext_message_is_put_in_queue(self): data = "my log message" - resp = self.client.simulate_post("/plaintext", body=data) + resp = self.client.post("/plaintext", body=data) assert resp.status_code == 200 event_from_queue = self.object.messages.get(timeout=0.001) assert event_from_queue.get("message") == data def test_jsonl_endpoint_match_regex_route(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/first/jsonl", json=data) + resp = self.client.post("/first/jsonl", json=data) assert resp.status_code == 200 def test_jsonl_endpoint_not_match_regex_route(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/firs/jsonl", json=data) + resp = self.client.post("/firs/jsonl", json=data) assert resp.status_code == 404 def test_jsonl_endpoint_not_match_before_start_regex(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/api/first/jsonl", json=data) + resp = self.client.post("/api/first/jsonl", json=data) assert resp.status_code == 404 def test_jsonl_endpoint_match_wildcard_regex_mix_route(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/third/jsonl/another_path/last_path", json=data) + resp = self.client.post("/third/jsonl/another_path/last_path", json=data) assert resp.status_code == 200 def test_jsonl_endpoint_not_match_wildcard_regex_mix_route(self): data = {"message": "my log message"} - resp = self.client.simulate_post("/api/third/jsonl/another_path", json=data) + resp = self.client.post("/api/third/jsonl/another_path", json=data) assert resp.status_code == 404 def test_jsonl_messages_are_put_in_queue(self): @@ -198,7 +199,7 @@ def test_jsonl_messages_are_put_in_queue(self): {"message": "my second log message"} {"message": "my third log message"} """ - resp = self.client.simulate_post("/jsonl", body=data) + resp = self.client.post("/jsonl", body=data) assert resp.status_code == 200 assert self.object.messages.qsize() == 3 event_from_queue = self.object.messages.get(timeout=1) @@ -210,7 +211,7 @@ def test_jsonl_messages_are_put_in_queue(self): def test_get_next_returns_message_from_queue(self): data = {"message": "my log message"} - self.client.simulate_post("/json", json=data) + self.client.post("/json", json=data) assert self.object.get_next(0.001) == data def test_get_next_returns_first_in_first_out(self): @@ -220,7 +221,7 @@ def test_get_next_returns_first_in_first_out(self): {"message": "third message"}, ] for message in data: - requests.post(url=self.target + "/json", json=message, timeout=0.5) + self.client.post("/json", json=message) assert self.object.get_next(0.001) == data[0] assert self.object.get_next(0.001) == data[1] assert self.object.get_next(0.001) == data[2] @@ -234,9 +235,9 @@ def test_get_next_returns_first_in_first_out_for_mixed_endpoints(self): for message in data: endpoint, post_data = message.values() if endpoint == "json": - self.client.simulate_post("/json", json=post_data) + self.client.post("/json", json=post_data) if endpoint == "plaintext": - self.client.simulate_post("/plaintext", body=post_data) + self.client.post("/plaintext", body=post_data) assert self.object.get_next(0.001) == data[0].get("data") assert self.object.get_next(0.001) == {"message": data[1].get("data")} assert self.object.get_next(0.001) == data[2].get("data") @@ -244,15 +245,12 @@ def test_get_next_returns_first_in_first_out_for_mixed_endpoints(self): def test_get_next_returns_none_for_empty_queue(self): assert self.object.get_next(0.001) is None - def test_server_returns_uvicorn_server_instance(self): - assert isinstance(self.object.http_server.server, uvicorn.Server) - def test_server_starts_threaded_server(self): message = {"message": "my message"} - for i in range(100): + for i in range(90): message["message"] = f"message number {i}" - self.client.simulate_post("/json", json=message) - assert self.object.messages.qsize() == 100, "messages are put to queue" + self.client.post("/json", json=message) + assert self.object.messages.qsize() == 90, "messages are put to queue" def test_get_metadata(self): message = {"message": "my message"} @@ -262,11 +260,11 @@ def test_get_metadata(self): connector = Factory.create({"test connector": connector_config}) connector.pipeline_index = 1 connector.setup() - target = connector.target - resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) + client = testing.TestClient(connector.app) + resp = client.post("/json", json=message) assert resp.status_code == 200 message = connector.messages.get(timeout=0.5) - assert message["custom"]["url"] == target + "/json" + assert message["custom"]["url"].endswith("/json") assert re.search(r"\d+\.\d+\.\d+\.\d+", message["custom"]["remote_addr"]) assert isinstance(message["custom"]["user_agent"], str) @@ -277,20 +275,14 @@ def test_server_multiple_config_changes(self): connector = Factory.create({"test connector": connector_config}) connector.pipeline_index = 1 connector.setup() - target = connector.target - resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) + client = testing.TestClient(connector.app) + resp = client.post("/json", json=message) assert resp.status_code == 200 - target = target.replace(":9001", ":9000") - try: - resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) - except requests.exceptions.ConnectionError as e: - assert e.response is None connector_config = deepcopy(self.CONFIG) connector = Factory.create({"test connector": connector_config}) connector.pipeline_index = 1 connector.setup() - target = connector.target - resp = requests.post(url=f"{target}/json", json=message, timeout=0.5) + resp = client.post("/json", json=message) assert resp.status_code == 200 def test_get_next_with_hmac_of_raw_message(self): @@ -310,7 +302,7 @@ def test_get_next_with_hmac_of_raw_message(self): connector.pipeline_index = 1 connector.setup() test_event = "the content" - self.client.simulate_post("/plaintext", body=test_event) + self.client.post("/plaintext", body=test_event) expected_event = { "message": "the content", @@ -329,9 +321,8 @@ def test_endpoint_returns_401_if_authorization_not_provided(self, credentials_fi new_connector = Factory.create({"test connector": self.CONFIG}) new_connector.pipeline_index = 1 new_connector.setup() - resp = requests.post( - url=f"{self.target}/auth-json-file", timeout=0.5, data=json.dumps(data) - ) + client = testing.TestClient(new_connector.app) + resp = client.post("/auth-json-file", body=json.dumps(data)) assert resp.status_code == 401 def test_endpoint_returns_401_on_wrong_authorization(self, credentials_file_path): @@ -341,10 +332,9 @@ def test_endpoint_returns_401_on_wrong_authorization(self, credentials_file_path new_connector = Factory.create({"test connector": self.CONFIG}) new_connector.pipeline_index = 1 new_connector.setup() - basic = HTTPBasicAuth("wrong", "credentials") - resp = requests.post( - url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5, json=data - ) + headers = {"Authorization": _basic_auth_str("wrong", "credentials")} + client = testing.TestClient(new_connector.app, headers=headers) + resp = client.post("/auth-json-file", body=json.dumps(data)) assert resp.status_code == 401 def test_endpoint_returns_200_on_correct_authorization_with_password_from_file( @@ -356,10 +346,9 @@ def test_endpoint_returns_200_on_correct_authorization_with_password_from_file( new_connector = Factory.create({"test connector": self.CONFIG}) new_connector.pipeline_index = 1 new_connector.setup() - basic = HTTPBasicAuth("user", "file_password") - resp = requests.post( - url=f"{self.target}/auth-json-file", auth=basic, timeout=0.5, json=data - ) + headers = {"Authorization": _basic_auth_str("user", "file_password")} + client = testing.TestClient(new_connector.app, headers=headers) + resp = client.post("/auth-json-file", body=json.dumps(data)) assert resp.status_code == 200 def test_endpoint_returns_200_on_correct_authorization_with_password_within_credentials_file( @@ -371,10 +360,9 @@ def test_endpoint_returns_200_on_correct_authorization_with_password_within_cred new_connector = Factory.create({"test connector": self.CONFIG}) new_connector.pipeline_index = 1 new_connector.setup() - basic = HTTPBasicAuth("user", "secret_password") - resp = requests.post( - url=f"{self.target}/auth-json-secret", auth=basic, timeout=0.5, json=data - ) + headers = {"Authorization": _basic_auth_str("user", "secret_password")} + client = testing.TestClient(new_connector.app, headers=headers) + resp = client.post("/auth-json-secret", body=json.dumps(data)) assert resp.status_code == 200 def test_endpoint_returns_200_on_correct_authorization_for_subpath(self, credentials_file_path): @@ -384,10 +372,9 @@ def test_endpoint_returns_200_on_correct_authorization_for_subpath(self, credent new_connector = Factory.create({"test connector": self.CONFIG}) new_connector.pipeline_index = 1 new_connector.setup() - basic = HTTPBasicAuth("user", "password") - resp = requests.post( - url=f"{self.target}/auth-json-secret/AB/json", auth=basic, timeout=0.5, json=data - ) + headers = {"Authorization": _basic_auth_str("user", "password")} + client = testing.TestClient(new_connector.app, headers=headers) + resp = client.post("/auth-json-secret/AB/json", body=json.dumps(data)) assert resp.status_code == 200 def test_two_connector_instances_share_the_same_queue(self): @@ -399,17 +386,17 @@ def test_messages_is_multiprocessing_queue(self): def test_all_endpoints_share_the_same_queue(self): data = {"message": "my log message"} - self.client.simulate_post("/json", json=data) + self.client.post("/json", json=data) assert self.object.messages.qsize() == 1 data = "my log message" - self.client.simulate_post("/plaintext", json=data) + self.client.post("/plaintext", json=data) assert self.object.messages.qsize() == 2 data = """ {"message": "my first log message"} {"message": "my second log message"} {"message": "my third log message"} """ - self.client.simulate_post("/jsonl", body=data) + self.client.post("/jsonl", body=data) assert self.object.messages.qsize() == 5 def test_sets_target_to_https_schema_if_ssl_options(self): @@ -436,7 +423,7 @@ def test_enpoints_count_requests(self): self.object.setup() random_number = random.randint(1, 100) for number in range(random_number): - self.client.simulate_post("/json", json={"message": f"my message{number}"}) + self.client.post("/json", json={"message": f"my message{number}"}) assert self.object.metrics.number_of_http_requests == random_number @pytest.mark.parametrize("endpoint", ["json", "plaintext", "jsonl"]) @@ -444,13 +431,13 @@ def test_endpoint_handles_gzip_compression(self, endpoint): data = {"message": "my log message"} data = gzip.compress(json.dumps(data).encode()) headers = {"Content-Encoding": "gzip"} - resp = self.client.simulate_post(f"/{endpoint}", body=data, headers=headers) + resp = self.client.post(f"/{endpoint}", body=data, headers=headers) assert resp.status_code == 200 @pytest.mark.parametrize("endpoint", ["json", "jsonl"]) def test_raises_http_bad_request_on_decode_error(self, endpoint): data = "this is not a valid json nor jsonl" - resp = self.client.simulate_post(f"/{endpoint}", body=data) + resp = self.client.post(f"/{endpoint}", body=data) assert resp.status_code == 400 @responses.activate