Skip to content

Commit

Permalink
fix http connector only first process working (#566)
Browse files Browse the repository at this point in the history
* add test, fix and update changelog
* set queue size in pipeline manager
  • Loading branch information
ekneg54 authored Apr 17, 2024
1 parent ab8db50 commit 18594fb
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

### Bugfix

* fix a bug in http connector leading to only first process working

## 11.0.1
### Bugfix

Expand Down
47 changes: 28 additions & 19 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,24 @@
"""

import inspect
import logging
import multiprocessing as mp
import queue
import re
import threading
from abc import ABC
from logging import Logger
import logging
import re
from typing import Mapping, Tuple, Union, Callable
from attrs import define, field, validators
from typing import Callable, Mapping, Tuple, Union

import falcon.asgi
import msgspec
import uvicorn
import falcon.asgi
from falcon import HTTPTooManyRequests, HTTPMethodNotAllowed # pylint: disable=no-name-in-module
from attrs import define, field, validators
from falcon import ( # pylint: disable=no-name-in-module
HTTPMethodNotAllowed,
HTTPTooManyRequests,
)

from logprep.abc.input import FatalInputError, Input
from logprep.util import defaults

Expand Down Expand Up @@ -113,15 +119,15 @@ class HttpEndpoint(ABC):
Parameters
----------
messages: queue.Queue
messages: mp.Queue
Input Events are put here
collect_meta: bool
Collects Metadata on True (default)
metafield_name: str
Defines key name for metadata
"""

def __init__(self, messages: queue.Queue, collect_meta: bool, metafield_name: str) -> None:
def __init__(self, messages: mp.Queue, collect_meta: bool, metafield_name: str) -> None:
self.messages = messages
self.collect_meta = collect_meta
self.metafield_name = metafield_name
Expand Down Expand Up @@ -288,12 +294,6 @@ def shut_down(self):
class HttpConnector(Input):
"""Connector to accept log messages as http post requests"""

_endpoint_registry: Mapping[str, HttpEndpoint] = {
"json": JSONHttpEndpoint,
"plaintext": PlaintextHttpEndpoint,
"jsonl": JSONLHttpEndpoint,
}

@define(kw_only=True)
class Config(Input.Config):
"""Config for HTTPInput"""
Expand Down Expand Up @@ -351,6 +351,14 @@ class Config(Input.Config):

__slots__ = []

messages: mp.Queue = None

_endpoint_registry: Mapping[str, HttpEndpoint] = {
"json": JSONHttpEndpoint,
"plaintext": PlaintextHttpEndpoint,
"jsonl": JSONLHttpEndpoint,
}

def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Logger) -> None:
super().__init__(name, configuration, logger)
internal_uvicorn_config = {
Expand All @@ -359,13 +367,9 @@ def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Log
"timeout_graceful_shutdown": 0,
}
self._config.uvicorn_config.update(internal_uvicorn_config)
self.logger = logger
self.port = self._config.uvicorn_config["port"]
self.host = self._config.uvicorn_config["host"]
self.target = "http://" + self.host + ":" + str(self.port)
self.messages = queue.Queue(
self._config.message_backlog_size
) # pylint: disable=attribute-defined-outside-init
self.target = f"http://{self.host}:{self.port}"

def setup(self):
"""setup starts the actual functionality of this connector.
Expand All @@ -378,6 +382,11 @@ def setup(self):
raise FatalInputError(
self, "Necessary instance attribute `pipeline_index` could not be found."
)
self._logger.debug(
f"HttpInput Connector started on target {self.target} and "
f"queue {id(self.messages)} "
f"with queue_size: {self.messages._maxsize}"
)
# Start HTTP Input only when in first process
if self.pipeline_index != 1:
return
Expand Down
4 changes: 2 additions & 2 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import multiprocessing

# pylint: disable=logging-fstring-interpolation
import queue
import multiprocessing.queues
import warnings
from ctypes import c_bool
from functools import cached_property, partial
Expand Down Expand Up @@ -307,7 +307,7 @@ def _shut_down(self) -> None:
def _drain_input_queues(self) -> None:
if not hasattr(self._input, "messages"):
return
if isinstance(self._input.messages, queue.Queue):
if isinstance(self._input.messages, multiprocessing.queues.Queue):
while self._input.messages.qsize():
self.process_pipeline()

Expand Down
8 changes: 8 additions & 0 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from attr import define, field

from logprep.abc.component import Component
from logprep.connector.http.input import HttpConnector
from logprep.framework.pipeline import Pipeline
from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.metrics import CounterMetric
Expand Down Expand Up @@ -76,6 +77,13 @@ def __init__(self, configuration: Configuration):
self.prometheus_exporter = PrometheusExporter(prometheus_config)
else:
self.prometheus_exporter = None
input_config = next(iter(configuration.input.values()))
if input_config.get("type") == "http_input":
# this workaround has to be done because the queue size is not configurable
# after initialization and the queue has to be shared between the multiple processes
if HttpConnector.messages is None:
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size)

def get_count(self) -> int:
"""Get the pipeline count.
Expand Down
5 changes: 4 additions & 1 deletion quickstart/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
version: 1
process_count: 2

logger:
level: DEBUG

metrics:
enabled: true
port: 8003
input:
httpinput:
type: http_input
message_backlog_size: 1500000
message_backlog_size: 1500
collect_meta: True
metafield_name: "@metadata"
uvicorn_config:
Expand Down
53 changes: 36 additions & 17 deletions tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
# pylint: disable=missing-docstring
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
import multiprocessing
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor

import falcon
import requests
import uvicorn
import falcon

from logprep.abc.input import FatalInputError
from logprep.connector.http.input import HttpConnector
from logprep.factory import Factory
from logprep.abc.input import FatalInputError
from tests.unit.connector.base import BaseInputTestCase


class TestHttpConnector(BaseInputTestCase):

def setup_method(self):
HttpConnector.messages = multiprocessing.Queue(
maxsize=self.CONFIG.get("message_backlog_size")
)
super().setup_method()
self.object.pipeline_index = 1
self.object.setup()
# we have to empty the queue for testing
while not self.object.messages.empty():
self.object.messages.get(timeout=0.001)
self.target = self.object.target

CONFIG: dict = {
Expand All @@ -40,6 +42,8 @@ def setup_method(self):
}

def teardown_method(self):
while not self.object.messages.empty():
self.object.messages.get(timeout=0.001)
self.object.shut_down()

def test_create_connector(self):
Expand Down Expand Up @@ -71,18 +75,11 @@ def test_get_error_code_on_get(self):
def test_get_error_code_too_many_requests(self):
data = {"message": "my log message"}
session = requests.Session()
session.mount(
"http://",
requests.adapters.HTTPAdapter(pool_maxsize=20, max_retries=3, pool_block=True),
)

def get_url(url):
for _ in range(100):
_ = session.post(url, json=data)

with ThreadPoolExecutor(max_workers=100) as executor:
executor.submit(get_url, f"{self.target}/json")
for i in range(100):
resp = session.post(url=f"{self.target}/json", json=data, timeout=0.5)
assert self.object.messages.qsize() == 100
resp = requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
assert self.object.messages._maxsize == 100
assert resp.status_code == 429

def test_json_endpoint_accepts_post_request(self):
Expand Down Expand Up @@ -278,3 +275,25 @@ def test_get_next_with_hmac_of_raw_message(self):
}
connector_next_msg, _ = connector.get_next(1)
assert connector_next_msg == expected_event, "Output event with hmac is not as expected"

def test_two_connector_instances_share_the_same_queue(self):
new_connector = Factory.create({"test connector": self.CONFIG}, logger=self.logger)
assert self.object.messages is new_connector.messages

def test_messages_is_multiprocessing_queue(self):
assert isinstance(self.object.messages, multiprocessing.queues.Queue)

def test_all_endpoints_share_the_same_queue(self):
data = {"message": "my log message"}
requests.post(url=f"{self.target}/json", json=data, timeout=0.5)
assert self.object.messages.qsize() == 1
data = "my log message"
requests.post(url=f"{self.target}/plaintext", json=data, timeout=0.5)
assert self.object.messages.qsize() == 2
data = """
{"message": "my first log message"}
{"message": "my second log message"}
{"message": "my third log message"}
"""
requests.post(url=f"{self.target}/jsonl", data=data, timeout=0.5)
assert self.object.messages.qsize() == 5
20 changes: 20 additions & 0 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from logging import Logger
from unittest import mock

from logprep.connector.http.input import HttpConnector
from logprep.factory import Factory
from logprep.framework.pipeline_manager import PipelineManager
from logprep.metrics.exporter import PrometheusExporter
from logprep.util.configuration import Configuration, MetricsConfig
Expand Down Expand Up @@ -197,3 +199,21 @@ def test_restart_failed_pipelines_sets_old_pipeline_index(self):
with mock.patch.object(pipeline_manager, "_create_pipeline") as mock_create_pipeline:
pipeline_manager.restart_failed_pipeline()
mock_create_pipeline.assert_called_once_with(1)

def test_pipeline_manager_sets_queue_size_for_http_input(self):
config = deepcopy(self.config)
config.input = {
"http": {
"type": "http_input",
"message_backlog_size": 100,
"collect_meta": False,
"uvicorn_config": {"port": 9000, "host": "127.0.0.1"},
"endpoints": {
"/json": "json",
},
}
}
PipelineManager(config)
assert HttpConnector.messages._maxsize == 100
http_input = Factory.create(config.input, mock.MagicMock())
assert http_input.messages._maxsize == 100

0 comments on commit 18594fb

Please sign in to comment.