From 74623ce3adf4970841d3d187a1fc50a6769ffbc1 Mon Sep 17 00:00:00 2001 From: ppcad <45867125+ppcad@users.noreply.github.com> Date: Mon, 8 Jan 2024 14:59:46 +0100 Subject: [PATCH] Adapt s3 connector for kafka fix (#499) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make s3 connector use max_retries parameter * Make number_of_warnings metric work in s3 connector * Add number_of_successful_writes metric to s3 connector * Make s3 connector support _write_backlog and refactor code * Update changelog * Optimize checking if s3 should be written and add tests * Make s3 connector blocking and make it raise FatalOutputError instead of warnings --------- Co-authored-by: Jörg Zimmermann <101292599+ekneg54@users.noreply.github.com> --- CHANGELOG.md | 8 +++ logprep/connector/s3/output.py | 94 ++++++++++++++------------ tests/unit/connector/test_s3_output.py | 64 ++++++++++++++---- 3 files changed, 106 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08ee3fd52..73f9a8dd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,20 @@ ### Features + +* add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3 +* make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0 * add option to Opensearch Output Connector to use parallel bulk implementation (default is True) + ### Improvements +* make the s3 connector raise `FatalOutputError` instead of warnings +* make the s3 connector blocking by removing threading + ### Bugfix +* make the s3 connector actually use the `max_retries` parameter ## v9.0.3 ### Breaking diff --git a/logprep/connector/s3/output.py b/logprep/connector/s3/output.py index 2ddea6907..b0ca34127 100644 --- a/logprep/connector/s3/output.py +++ b/logprep/connector/s3/output.py @@ -4,9 +4,6 @@ This section contains the connection settings for the AWS s3 output connector. -This connector is non-blocking and may skip sending data if previous data has not finished sending. -It doesn't crash if a connection couldn't be established, but sends a warning. - The target bucket is defined by the :code:`bucket` configuration parameter. The prefix is defined by the value in the field :code:`prefix_field` in the document. @@ -42,9 +39,7 @@ """ import json import re -import threading from collections import defaultdict -from copy import deepcopy from functools import cached_property from logging import Logger from time import time @@ -62,8 +57,8 @@ EndpointConnectionError, ) -from logprep.abc.output import Output -from logprep.metrics.metrics import Metric +from logprep.abc.output import Output, FatalOutputError +from logprep.metrics.metrics import Metric, CounterMetric from logprep.util.helper import get_dotted_field_value from logprep.util.time import TimeParser @@ -117,13 +112,21 @@ class Config(Output.Config): """The input callback is called after the maximum backlog size has been reached if this is set to True (optional)""" - __slots__ = ["_message_backlog", "_current_backlog_count", "_index_cache"] + @define(kw_only=True) + class Metrics(Output.Metrics): + """Tracks statistics about this output""" - _message_backlog: DefaultDict + number_of_successful_writes: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were successfully written to s3", + name="number_of_successful_writes", + ) + ) + """Number of events that were successfully written to s3""" - _current_backlog_count: int + __slots__ = ["_message_backlog", "_index_cache"] - _writing_thread: Optional[threading.Thread] + _message_backlog: DefaultDict _s3_resource: Optional["boto3.resources.factory.s3.ServiceResource"] @@ -134,7 +137,6 @@ class Config(Output.Config): def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger): super().__init__(name, configuration, logger) self._message_backlog = defaultdict(list) - self._current_backlog_count = 0 self._writing_thread = None self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else "" self._s3_resource = None @@ -147,7 +149,8 @@ def _setup_s3_resource(self): region_name=self._config.region_name, ) config = boto3.session.Config( - connect_timeout=self._config.connect_timeout, retries={"max_attempts": 0} + connect_timeout=self._config.connect_timeout, + retries={"max_attempts": self._config.max_retries}, ) self._s3_resource = session.resource( "s3", @@ -162,6 +165,10 @@ def s3_resource(self): """Return s3 resource""" return self._s3_resource + @property + def _backlog_size(self): + return sum(map(len, self._message_backlog.values())) + @cached_property def _replace_pattern(self): return re.compile(r"%{\S+?}") @@ -195,52 +202,51 @@ def _write_to_s3_resource(self, document: dict, prefix: str): ---------- document : dict Document to store. - - Returns - ------- - Returns True to inform the pipeline to call the batch_finished_callback method in the - configured input """ prefix = self._add_dates(prefix) prefix = f"{self._base_prefix}{prefix}" self._message_backlog[prefix].append(document) - backlog_count = self._current_backlog_count + 1 - if backlog_count == self._config.message_backlog_size: - if self._writing_thread is None or not self._writing_thread.is_alive(): - message_backlog = deepcopy(self._message_backlog) - self._writing_thread = threading.Thread( - target=self._write_document_batches, args=(message_backlog,) - ) - self._writing_thread.start() - return True - self._current_backlog_count = backlog_count - return False - - def _write_document_batches(self, message_backlog): - self._logger.info(f"Writing {self._current_backlog_count + 1} documents to s3") - for prefix_mb, document_batch in message_backlog.items(): + if self._backlog_size >= self._config.message_backlog_size: + self._write_backlog() + + def _write_backlog(self): + """Write to s3 if it is not already writing.""" + if not self._message_backlog: + return + + self._bulk() + + def _bulk(self): + self._logger.info("Writing %s documents to s3", self._backlog_size) + for prefix_mb, document_batch in self._message_backlog.items(): self._write_document_batch(document_batch, f"{prefix_mb}/{time()}-{uuid4()}") self._message_backlog.clear() - self._current_backlog_count = 0 + + if not self._config.call_input_callback: + return + + if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"): + self.input_connector.batch_finished_callback() def _write_document_batch(self, document_batch: dict, identifier: str): try: self._write_to_s3(document_batch, identifier) - except EndpointConnectionError: - self._logger.warning(f"{self.describe()}: Could not connect to the endpoint URL") - except ConnectionClosedError: - self._logger.warning( - f"{self.describe()}: " - f"Connection was closed before we received a valid response from endpoint URL" - ) + except EndpointConnectionError as error: + raise FatalOutputError(self, "Could not connect to the endpoint URL") from error + except ConnectionClosedError as error: + raise FatalOutputError( + self, + "Connection was closed before we received a valid response from endpoint URL", + ) from error except (BotoCoreError, ClientError) as error: - self._logger.warning(f"{self.describe()}: {error}") + raise FatalOutputError(self, str(error)) from error def _write_to_s3(self, document_batch: dict, identifier: str): self._logger.debug(f'Writing "{identifier}" to s3 bucket "{self._config.bucket}"') s3_obj = self.s3_resource.Object(self._config.bucket, identifier) s3_obj.put(Body=self._encoder.encode(document_batch), ContentType="application/json") + self.metrics.number_of_successful_writes += len(document_batch) def store(self, document: dict): """Store a document into s3 bucket. @@ -259,9 +265,7 @@ def store(self, document: dict): ) prefix_value = self._config.default_prefix - batch_finished = self._write_to_s3_resource(document, prefix_value) - if self._config.call_input_callback and batch_finished and self.input_connector: - self.input_connector.batch_finished_callback() + self._write_to_s3_resource(document, prefix_value) @staticmethod def _build_no_prefix_document(message_document: dict, reason: str): diff --git a/tests/unit/connector/test_s3_output.py b/tests/unit/connector/test_s3_output.py index 6d76c26a9..c0c4a6776 100644 --- a/tests/unit/connector/test_s3_output.py +++ b/tests/unit/connector/test_s3_output.py @@ -4,11 +4,9 @@ # pylint: disable=wrong-import-order # pylint: disable=attribute-defined-outside-init import logging -import re from copy import deepcopy from datetime import datetime from math import isclose -from time import sleep from unittest import mock import pytest @@ -19,6 +17,7 @@ EndpointConnectionError, ) +from logprep.abc.output import FatalOutputError from logprep.factory import Factory from logprep.util.time import TimeParser from tests.unit.connector.base import BaseOutputTestCase @@ -41,6 +40,15 @@ class TestS3Output(BaseOutputTestCase): "message_backlog_size": 1, } + expected_metrics = [ + "logprep_processing_time_per_event", + "logprep_number_of_processed_events", + "logprep_number_of_failed_events", + "logprep_number_of_warnings", + "logprep_number_of_errors", + "logprep_number_of_successful_writes", + ] + def test_describe_returns_s3_output(self): assert ( self.object.describe() == "S3Output (Test Instance Name) - S3 Output: http://host:123" @@ -173,18 +181,18 @@ def test_write_document_batch_calls_handles_errors(self, caplog, error, message) "logprep.connector.s3.output.S3Output._write_to_s3", side_effect=error, ): - self.object._write_document_batch({"dummy": "event"}, "dummy_identifier") - assert re.match(message, caplog.text) + with pytest.raises(FatalOutputError, match=message): + self.object._write_document_batch({"dummy": "event"}, "dummy_identifier") def test_write_to_s3_resource_sets_current_backlog_count_and_below_max_backlog(self): s3_config = deepcopy(self.CONFIG) message_backlog_size = 5 s3_config.update({"message_backlog_size": message_backlog_size}) s3_output = Factory.create({"s3": s3_config}, self.logger) - assert s3_output._current_backlog_count == 0 + assert self._calculate_backlog_size(s3_output) == 0 for idx in range(1, message_backlog_size): s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3") - assert s3_output._current_backlog_count == idx + assert self._calculate_backlog_size(s3_output) == idx def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self): s3_config = deepcopy(self.CONFIG) @@ -198,28 +206,28 @@ def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self # Backlog not full for idx in range(message_backlog_size - 1): s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3") - sleep(0.1) # nosemgrep - assert s3_output._current_backlog_count == idx + 1 + self._wait_for_writing_thread(s3_output) + assert self._calculate_backlog_size(s3_output) == idx + 1 s3_output._write_document_batch.assert_not_called() # Backlog full then cleared s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3") - sleep(0.1) # nosemgrep + self._wait_for_writing_thread(s3_output) s3_output._write_document_batch.assert_called_once() - assert s3_output._current_backlog_count == 0 + assert self._calculate_backlog_size(s3_output) == 0 # Backlog not full for idx in range(message_backlog_size - 1): s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3") - sleep(0.1) # nosemgrep - assert s3_output._current_backlog_count == idx + 1 + self._wait_for_writing_thread(s3_output) + assert self._calculate_backlog_size(s3_output) == idx + 1 s3_output._write_document_batch.assert_called_once() # Backlog full then cleared s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3") - sleep(0.1) # nosemgrep + self._wait_for_writing_thread(s3_output) assert s3_output._write_document_batch.call_count == 2 - assert s3_output._current_backlog_count == 0 + assert self._calculate_backlog_size(s3_output) == 0 def test_store_counts_processed_events(self): self.object._s3_resource = mock.MagicMock() @@ -227,7 +235,10 @@ def test_store_counts_processed_events(self): def test_store_calls_batch_finished_callback(self): self.object._s3_resource = mock.MagicMock() - super().test_store_calls_batch_finished_callback() + self.object.input_connector = mock.MagicMock() + self.object.store({"message": "my event message"}) + self._wait_for_writing_thread(self.object) + self.object.input_connector.batch_finished_callback.assert_called() def test_store_does_not_call_batch_finished_callback_if_disabled(self): s3_config = deepcopy(self.CONFIG) @@ -240,7 +251,30 @@ def test_store_does_not_call_batch_finished_callback_if_disabled(self): def test_write_to_s3_resource_replaces_dates(self): expected_prefix = f'base_prefix/prefix-{TimeParser.now().strftime("%y:%m:%d")}' + self.object._write_backlog = mock.MagicMock() self.object._write_to_s3_resource({"foo": "bar"}, "base_prefix/prefix-%{%y:%m:%d}") resulting_prefix = next(iter(self.object._message_backlog.keys())) assert expected_prefix == resulting_prefix + + def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self): + self.object._config.message_backlog_size = 2 + assert len(self.object._message_backlog) == 0 + with mock.patch( + "logprep.connector.s3.output.S3Output._write_backlog" + ) as mock_write_backlog: + self.object.store({"test": "event"}) + mock_write_backlog.assert_not_called() + + def test_store_failed_counts_failed_events(self): + self.object._write_backlog = mock.MagicMock() + super().test_store_failed_counts_failed_events() + + @staticmethod + def _wait_for_writing_thread(s3_output): + if s3_output._writing_thread is not None: + s3_output._writing_thread.join() + + @staticmethod + def _calculate_backlog_size(s3_output): + return sum(len(values) for values in s3_output._message_backlog.values())