diff --git a/CHANGELOG.md b/CHANGELOG.md index 76c493acc..decc97519 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ ### Bugfix +* fix a bug in http connector leading to only first process working + ## 11.0.1 ### Bugfix diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index f9988a7df..fe56ab444 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -26,18 +26,24 @@ """ import inspect +import logging +import multiprocessing as mp import queue +import re import threading from abc import ABC from logging import Logger -import logging -import re -from typing import Mapping, Tuple, Union, Callable -from attrs import define, field, validators +from typing import Callable, Mapping, Tuple, Union + +import falcon.asgi import msgspec import uvicorn -import falcon.asgi -from falcon import HTTPTooManyRequests, HTTPMethodNotAllowed # pylint: disable=no-name-in-module +from attrs import define, field, validators +from falcon import ( # pylint: disable=no-name-in-module + HTTPMethodNotAllowed, + HTTPTooManyRequests, +) + from logprep.abc.input import FatalInputError, Input from logprep.util import defaults @@ -113,7 +119,7 @@ class HttpEndpoint(ABC): Parameters ---------- - messages: queue.Queue + messages: mp.Queue Input Events are put here collect_meta: bool Collects Metadata on True (default) @@ -121,7 +127,7 @@ class HttpEndpoint(ABC): Defines key name for metadata """ - def __init__(self, messages: queue.Queue, collect_meta: bool, metafield_name: str) -> None: + def __init__(self, messages: mp.Queue, collect_meta: bool, metafield_name: str) -> None: self.messages = messages self.collect_meta = collect_meta self.metafield_name = metafield_name @@ -288,12 +294,6 @@ def shut_down(self): class HttpConnector(Input): """Connector to accept log messages as http post requests""" - _endpoint_registry: Mapping[str, HttpEndpoint] = { - "json": JSONHttpEndpoint, - "plaintext": PlaintextHttpEndpoint, - "jsonl": JSONLHttpEndpoint, - } - @define(kw_only=True) class Config(Input.Config): """Config for HTTPInput""" @@ -351,6 +351,14 @@ class Config(Input.Config): __slots__ = [] + messages: mp.Queue = None + + _endpoint_registry: Mapping[str, HttpEndpoint] = { + "json": JSONHttpEndpoint, + "plaintext": PlaintextHttpEndpoint, + "jsonl": JSONLHttpEndpoint, + } + def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Logger) -> None: super().__init__(name, configuration, logger) internal_uvicorn_config = { @@ -359,13 +367,9 @@ def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Log "timeout_graceful_shutdown": 0, } self._config.uvicorn_config.update(internal_uvicorn_config) - self.logger = logger self.port = self._config.uvicorn_config["port"] self.host = self._config.uvicorn_config["host"] - self.target = "http://" + self.host + ":" + str(self.port) - self.messages = queue.Queue( - self._config.message_backlog_size - ) # pylint: disable=attribute-defined-outside-init + self.target = f"http://{self.host}:{self.port}" def setup(self): """setup starts the actual functionality of this connector. @@ -378,6 +382,11 @@ def setup(self): raise FatalInputError( self, "Necessary instance attribute `pipeline_index` could not be found." ) + self._logger.debug( + f"HttpInput Connector started on target {self.target} and " + f"queue {id(self.messages)} " + f"with queue_size: {self.messages._maxsize}" + ) # Start HTTP Input only when in first process if self.pipeline_index != 1: return diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 98c1b8f7a..4b58b89a1 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -11,7 +11,7 @@ import multiprocessing # pylint: disable=logging-fstring-interpolation -import queue +import multiprocessing.queues import warnings from ctypes import c_bool from functools import cached_property, partial @@ -307,7 +307,7 @@ def _shut_down(self) -> None: def _drain_input_queues(self) -> None: if not hasattr(self._input, "messages"): return - if isinstance(self._input.messages, queue.Queue): + if isinstance(self._input.messages, multiprocessing.queues.Queue): while self._input.messages.qsize(): self.process_pipeline() diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 6567b4a5a..902f111e9 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -10,6 +10,7 @@ from attr import define, field from logprep.abc.component import Component +from logprep.connector.http.input import HttpConnector from logprep.framework.pipeline import Pipeline from logprep.metrics.exporter import PrometheusExporter from logprep.metrics.metrics import CounterMetric @@ -76,6 +77,13 @@ def __init__(self, configuration: Configuration): self.prometheus_exporter = PrometheusExporter(prometheus_config) else: self.prometheus_exporter = None + input_config = next(iter(configuration.input.values())) + if input_config.get("type") == "http_input": + # this workaround has to be done because the queue size is not configurable + # after initialization and the queue has to be shared between the multiple processes + if HttpConnector.messages is None: + message_backlog_size = input_config.get("message_backlog_size", 15000) + HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size) def get_count(self) -> int: """Get the pipeline count. diff --git a/quickstart/exampledata/config/http_pipeline.yml b/quickstart/exampledata/config/http_pipeline.yml index b65afac40..569d1ee23 100644 --- a/quickstart/exampledata/config/http_pipeline.yml +++ b/quickstart/exampledata/config/http_pipeline.yml @@ -1,13 +1,16 @@ version: 1 process_count: 2 +logger: + level: DEBUG + metrics: enabled: true port: 8003 input: httpinput: type: http_input - message_backlog_size: 1500000 + message_backlog_size: 1500 collect_meta: True metafield_name: "@metadata" uvicorn_config: diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index 610d46210..ed9c96cec 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -1,26 +1,28 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init +import multiprocessing from copy import deepcopy -from concurrent.futures import ThreadPoolExecutor + +import falcon import requests import uvicorn -import falcon + +from logprep.abc.input import FatalInputError from logprep.connector.http.input import HttpConnector from logprep.factory import Factory -from logprep.abc.input import FatalInputError from tests.unit.connector.base import BaseInputTestCase class TestHttpConnector(BaseInputTestCase): def setup_method(self): + HttpConnector.messages = multiprocessing.Queue( + maxsize=self.CONFIG.get("message_backlog_size") + ) super().setup_method() self.object.pipeline_index = 1 self.object.setup() - # we have to empty the queue for testing - while not self.object.messages.empty(): - self.object.messages.get(timeout=0.001) self.target = self.object.target CONFIG: dict = { @@ -40,6 +42,8 @@ def setup_method(self): } def teardown_method(self): + while not self.object.messages.empty(): + self.object.messages.get(timeout=0.001) self.object.shut_down() def test_create_connector(self): @@ -71,18 +75,11 @@ def test_get_error_code_on_get(self): def test_get_error_code_too_many_requests(self): data = {"message": "my log message"} session = requests.Session() - session.mount( - "http://", - requests.adapters.HTTPAdapter(pool_maxsize=20, max_retries=3, pool_block=True), - ) - - def get_url(url): - for _ in range(100): - _ = session.post(url, json=data) - - with ThreadPoolExecutor(max_workers=100) as executor: - executor.submit(get_url, f"{self.target}/json") + for i in range(100): + resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5) + assert self.object.messages.qsize() == 100 resp = requests.post(url=f"{self.target}/json", json=data, timeout=0.5) + assert self.object.messages._maxsize == 100 assert resp.status_code == 429 def test_json_endpoint_accepts_post_request(self): @@ -278,3 +275,25 @@ def test_get_next_with_hmac_of_raw_message(self): } connector_next_msg, _ = connector.get_next(1) assert connector_next_msg == expected_event, "Output event with hmac is not as expected" + + def test_two_connector_instances_share_the_same_queue(self): + new_connector = Factory.create({"test connector": self.CONFIG}, logger=self.logger) + assert self.object.messages is new_connector.messages + + def test_messages_is_multiprocessing_queue(self): + assert isinstance(self.object.messages, multiprocessing.queues.Queue) + + def test_all_endpoints_share_the_same_queue(self): + data = {"message": "my log message"} + requests.post(url=f"{self.target}/json", json=data, timeout=0.5) + assert self.object.messages.qsize() == 1 + data = "my log message" + requests.post(url=f"{self.target}/plaintext", json=data, timeout=0.5) + assert self.object.messages.qsize() == 2 + data = """ + {"message": "my first log message"} + {"message": "my second log message"} + {"message": "my third log message"} + """ + requests.post(url=f"{self.target}/jsonl", data=data, timeout=0.5) + assert self.object.messages.qsize() == 5 diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index b8148efca..141f97568 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -5,6 +5,8 @@ from logging import Logger from unittest import mock +from logprep.connector.http.input import HttpConnector +from logprep.factory import Factory from logprep.framework.pipeline_manager import PipelineManager from logprep.metrics.exporter import PrometheusExporter from logprep.util.configuration import Configuration, MetricsConfig @@ -197,3 +199,21 @@ def test_restart_failed_pipelines_sets_old_pipeline_index(self): with mock.patch.object(pipeline_manager, "_create_pipeline") as mock_create_pipeline: pipeline_manager.restart_failed_pipeline() mock_create_pipeline.assert_called_once_with(1) + + def test_pipeline_manager_sets_queue_size_for_http_input(self): + config = deepcopy(self.config) + config.input = { + "http": { + "type": "http_input", + "message_backlog_size": 100, + "collect_meta": False, + "uvicorn_config": {"port": 9000, "host": "127.0.0.1"}, + "endpoints": { + "/json": "json", + }, + } + } + PipelineManager(config) + assert HttpConnector.messages._maxsize == 100 + http_input = Factory.create(config.input, mock.MagicMock()) + assert http_input.messages._maxsize == 100