Skip to content

Commit

Permalink
refactor http generator output to http_output connector (#591)
Browse files Browse the repository at this point in the history
------

Co-authored-by: dtrai2 <[email protected]>
  • Loading branch information
ekneg54 and dtrai2 authored Jun 5, 2024
1 parent 856ceaf commit b8353ff
Show file tree
Hide file tree
Showing 23 changed files with 499 additions and 563 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
* `requester` now writes `_requester_missing_field_warning` tag to event tags instead of `_requester_failure` in case of missing fields
* `timestamp_differ` now writes `_timestamp_differ_missing_field_warning` tag to event tags instead of `_timestamp_differ_failure` in case of missing fields
* `timestamper` now writes `_timestamper_missing_field_warning` tag to event tags instead of `_timestamper_failure` in case of missing fields
* rename `--thread_count` parameter to `--thread-count` in http generator
* removed `--report` parameter and feature from http generator

### Features

* add UCL into the Quickstart Setup
* add logprep http output connector

### Improvements

Expand All @@ -32,6 +35,11 @@
* handle missing fields in processors via `_handle_missing_fields` from the field_manager
* add `LogprepMPQueueListener` to outsource logging to a separate process
* add a single `Queuehandler` to root logger to ensure all logs were handled by `LogprepMPQueueListener`
* refactor `http_generator` to use a logprep http output connector

### Bugfix

* make `--username` and `--password` parameters optional in http generator

## 11.3.0

Expand Down
2 changes: 1 addition & 1 deletion doc/source/user_manual/configuration/input.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Input
:noindex:

.. automodule:: logprep.connector.http.input
.. autoclass:: logprep.connector.http.input.HttpConnector.Config
.. autoclass:: logprep.connector.http.input.HttpInput.Config
:members:
:undoc-members:
:inherited-members:
Expand Down
7 changes: 7 additions & 0 deletions doc/source/user_manual/configuration/output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,10 @@ logprep only guaranties that one output has received data by calling the
:undoc-members:
:inherited-members:
:noindex:

.. automodule:: logprep.connector.http.output
.. autoclass:: logprep.connector.http.output.HttpOutput.Config
:members:
:undoc-members:
:inherited-members:
:noindex:
8 changes: 4 additions & 4 deletions logprep/abc/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ class Metrics(Component.Metrics):

number_of_processed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were processed",
description="Number of successfull events",
name="number_of_processed_events",
)
)
"""Number of events that were processed"""
"""Number of successfull events"""

number_of_failed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were send to error output",
description="Number of failed events",
name="number_of_failed_events",
)
)
"""Number of events that were send to error output"""
"""Number of failed events"""

processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
Expand Down
6 changes: 3 additions & 3 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def __init__(
collect_meta: bool,
metafield_name: str,
credentials: dict,
metrics: "HttpConnector.Metrics",
metrics: "HttpInput.Metrics",
) -> None:
self.messages = messages
self.collect_meta = collect_meta
Expand Down Expand Up @@ -300,7 +300,7 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff
self.messages.put({**event, **metadata}, block=False)


class HttpConnector(Input):
class HttpInput(Input):
"""Connector to accept log messages as http post requests"""

@define(kw_only=True)
Expand Down Expand Up @@ -413,7 +413,7 @@ class Config(Input.Config):
"jsonl": JSONLHttpEndpoint,
}

def __init__(self, name: str, configuration: "HttpConnector.Config") -> None:
def __init__(self, name: str, configuration: "HttpInput.Config") -> None:
super().__init__(name, configuration)
port = self._config.uvicorn_config["port"]
host = self._config.uvicorn_config["host"]
Expand Down
218 changes: 218 additions & 0 deletions logprep/connector/http/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
HTTPOutput
==========
A http output connector that sends http post requests to paths under a given endpoint
HTTP Output Connector Config Example
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
An example config file would look like:
.. code-block:: yaml
:linenos:
output:
myhttpoutput:
type: http_output
target_url: http://the.target.url:8080
username: user
password: password
The :code:`store` method of this connector can be fed with a :code:`dictionary` or a :code:`tuple`.
If a :code:`tuple` is passed, the first element is the target path and
the second element is the event or a list of events.
If a :code:`dictionary` is passed, the event will be send to the configured root
of the :code:`target_url`.
.. security-best-practice::
:title: Http Output Connector - Usage
This Connector is currently only used in the log generator and does not have a stable interface.
Do not use this in production.
.. security-best-practice::
:title: Http Output Connector - SSL
This connector does not verify the SSL Context, which could lead to exposing sensitive data.
.. warning::
The :code:`store_failed` method only counts the number of failed events and does not send them
to a dead letter queue.
"""

import json
import logging
from functools import cached_property

import requests
from attrs import define, field, validators

from logprep.abc.output import Output
from logprep.metrics.metrics import CounterMetric

logger = logging.getLogger("HttpOutput")


class HttpOutput(Output):
"""Output that sends http post requests to paths under a given endpoint
with configured credentials"""

@define(kw_only=True)
class Metrics(Output.Metrics):
"""Tracks statistics about this connector"""

number_of_http_requests: CounterMetric = field(
factory=lambda: CounterMetric(
description="Requests total",
name="number_of_http_requests",
)
)
"""Requests total"""

status_codes: CounterMetric = field(
factory=lambda: CounterMetric(
description="Requests http status",
name="status_codes",
inject_label_values=False,
),
)
"""Requests http status"""

connection_errors: CounterMetric = field(
factory=lambda: CounterMetric(
description="Requests Connection Errors",
name="connection_errors",
),
)
"""Requests Connection Errors"""

timeouts: CounterMetric = field(
factory=lambda: CounterMetric(
description="Requests Timeouts",
name="timeouts",
),
)
"""Requests Timeouts"""

@define(kw_only=True)
class Config(Output.Config):
"""Configuration for the HttpOutput."""

user: str = field(
validator=validators.instance_of(str),
default="",
converter=lambda x: "" if x is None else x,
)
"""User that is used for the basic auth http request"""
password: str = field(
validator=validators.instance_of(str),
default="",
converter=lambda x: "" if x is None else x,
)
"""Password that is used for the basic auth http request"""
target_url: str
"""URL of the endpoint that receives the events"""

@property
def user(self):
"""Return the user that is used for the http request"""
return self._config.user

@property
def password(self):
"""Return the password that is used for the http request"""
return self._config.password

@cached_property
def _headers(self):
return {"Content-Type": "application/x-ndjson; charset=utf-8"}

@property
def statistics(self) -> str:
"""Return the statistics of this connector as a formatted string."""
stats: dict = {}
metrics = filter(lambda x: not x.name.startswith("_"), self.metrics.__attrs_attrs__)
for metric in metrics:
samples = filter(
lambda x: x.name.endswith("_total")
and "number_of_warnings" not in x.name # blocklisted metric
and "number_of_errors" not in x.name, # blocklisted metric
getattr(self.metrics, metric.name).tracker.collect()[0].samples,
)
for sample in samples:
key = (
getattr(self.metrics, metric.name).description
if metric.name != "status_codes"
else sample.labels.get("description")
)
stats[key] = int(sample.value)
return json.dumps(stats, sort_keys=True, indent=4, separators=(",", ": "))

def store(self, document: tuple[str, dict | list[dict]] | dict) -> None:
if isinstance(document, tuple):
target, document = document
target = f"{self._config.target_url}{target}"
else:
target = self._config.target_url
self.store_custom(document, target)

def store_failed(self, error_message, document_received, document_processed) -> None:
self.metrics.number_of_failed_events += 1

def store_custom(self, document: dict | tuple | list, target: str) -> None:
"""Send a post request with given data to the specified endpoint"""
if isinstance(document, (tuple, list)):
request_data = self._encoder.encode_lines(document)
document_count = len(document)
elif isinstance(document, dict):
request_data = self._encoder.encode(document)
document_count = 1
else:
error = TypeError(f"Document type {type(document)} is not supported")
self.metrics.number_of_failed_events += 1
logger.error(str(error))
return
try:
try:
logger.debug(request_data)
response = requests.post(
url=target,
headers=self._headers,
verify=False,
auth=(self.user, self.password),
timeout=2,
data=request_data,
)
logger.debug("Servers response code is: %i", response.status_code)
self.metrics.status_codes.add_with_labels(
1,
{
"description": f"{self.metrics.status_codes.description} {response.status_code}"
},
)
response.raise_for_status()
self.metrics.number_of_processed_events += document_count
self.metrics.number_of_http_requests += 1
if self.input_connector is not None:
self.input_connector.batch_finished_callback()
except requests.RequestException as error:
logger.error("Failed to send event: %s", str(error))
logger.debug("Failed event: %s", document)
self.metrics.number_of_failed_events += document_count
self.metrics.number_of_http_requests += 1
if not isinstance(error, requests.exceptions.HTTPError):
raise error
except requests.exceptions.ConnectionError as error:
logger.error(error)
self.metrics.connection_errors += 1
if isinstance(error, requests.exceptions.Timeout):
self.metrics.timeouts += 1
except requests.exceptions.MissingSchema as error:
raise ConnectionError(
f"No schema set in target-url: {self._config.get('target_url')}"
) from error
except requests.exceptions.Timeout as error:
# other timeouts than connection timeouts are handled here
logger.error(error)
self.metrics.timeouts += 1
1 change: 0 additions & 1 deletion logprep/factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This module contains a factory to create connectors and processors."""

import copy
import logging
from typing import TYPE_CHECKING

from logprep.configuration import Configuration
Expand Down
6 changes: 3 additions & 3 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from attr import define, field

from logprep.abc.component import Component
from logprep.connector.http.input import HttpConnector
from logprep.connector.http.input import HttpInput
from logprep.framework.pipeline import Pipeline
from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.metrics import CounterMetric
Expand Down Expand Up @@ -81,10 +81,10 @@ def _set_http_input_queue(self, configuration):
"""
input_config = next(iter(configuration.input.values()))
is_http_input = input_config.get("type") == "http_input"
if not is_http_input and HttpConnector.messages is not None:
if not is_http_input and HttpInput.messages is not None:
return
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size)
HttpInput.messages = multiprocessing.Queue(maxsize=message_backlog_size)

def set_count(self, count: int):
"""Set the pipeline count.
Expand Down
Loading

0 comments on commit b8353ff

Please sign in to comment.