From 8c484b939001cad20a1764b6225cc01c3d205ea0 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Thu, 1 Feb 2024 12:15:08 +0100 Subject: [PATCH 1/9] Make kafka output store offset for successfully delivered events --- CHANGELOG.md | 1 + logprep/abc/output.py | 4 +- logprep/connector/confluent_kafka/input.py | 72 ++++++++++-- logprep/connector/confluent_kafka/output.py | 111 +++++++++++++++--- tests/unit/connector/base.py | 3 +- .../connector/test_confluent_kafka_input.py | 6 + .../connector/test_confluent_kafka_output.py | 39 +++++- 7 files changed, 208 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0836533a7..f38cc51a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3 * make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0 * add option to Opensearch Output Connector to use parallel bulk implementation (default is True) +* make confluent kafka output store offsets only for successfully delivered events ### Improvements diff --git a/logprep/abc/output.py b/logprep/abc/output.py index 18d771f4c..24469e4a2 100644 --- a/logprep/abc/output.py +++ b/logprep/abc/output.py @@ -53,9 +53,10 @@ class Config(Connector.Config): But this output can be called as output for extra_data. """ - __slots__ = {"input_connector"} + __slots__ = {"input_connector", "handles_backlog"} input_connector: Optional[Input] + handles_backlog: bool @property def default(self): @@ -75,6 +76,7 @@ def metric_labels(self) -> dict: def __init__(self, name: str, configuration: "Connector.Config", logger: Logger): super().__init__(name, configuration, logger) self.input_connector = None + self.handles_backlog = True @abstractmethod def store(self, document: dict) -> Optional[bool]: diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 9565c6a8f..5ed83ae41 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -31,7 +31,7 @@ from functools import cached_property, partial from logging import Logger from socket import getfqdn -from typing import Callable, Optional, Tuple, Union +from typing import Callable, Optional, Tuple, Union, Iterable import msgspec from attrs import define, field, validators @@ -43,6 +43,7 @@ Consumer, KafkaException, TopicPartition, + Message, ) from logprep.abc.connector import Connector @@ -234,12 +235,20 @@ class Config(Input.Config): """ _last_valid_records: dict + _last_valid_record: Optional[Message] + _last_delivered_records: dict - __slots__ = ["_last_valid_records"] + __slots__ = [ + "_last_valid_records", + "_last_valid_record", + "_last_delivered_records", + ] def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None: super().__init__(name, configuration, logger) + self._last_valid_record = None self._last_valid_records = {} + self._last_delivered_records = {} @cached_property def _consumer(self) -> Consumer: @@ -356,7 +365,7 @@ def describe(self) -> str: base_description = super().describe() return f"{base_description} - Kafka Input: {self._config.kafka_config['bootstrap.servers']}" - def _get_raw_event(self, timeout: float) -> bytearray: + def _get_raw_event(self, timeout: float) -> Optional[bytearray]: """Get next raw Message from Kafka. Parameters @@ -385,11 +394,29 @@ def _get_raw_event(self, timeout: float) -> bytearray: raise CriticalInputError( self, "A confluent-kafka record contains an error code", kafka_error ) - self._last_valid_records[message.partition()] = message + self._update_last_valid_records(message) labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"} self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels) return message.value() + def _update_last_valid_records(self, message: Message): + """Update last valid records. + + It updates a dict with multiple partitions if the output connector handles its own backlog. + It stores just the last message if the output connector does not handle its own backlog and + just needs the last valid record. + + Parameters + ------- + message : Message + Message to update the last valid records with. + + """ + if self.output_connector.handles_backlog: + self._last_valid_records[message.partition()] = message + else: + self._last_valid_record = message + def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dict]]: """Parse the raw document from Kafka into a json. @@ -429,6 +456,10 @@ def _enable_auto_offset_store(self) -> bool: def _enable_auto_commit(self) -> bool: return self._config.kafka_config.get("enable.auto.commit") == "true" + @property + def last_valid_record(self) -> Optional[dict]: + return self._last_valid_record + def batch_finished_callback(self) -> None: """Store offsets for each kafka partition in `self._last_valid_records` and if configured commit them. Should be called by output connectors if @@ -439,15 +470,39 @@ def batch_finished_callback(self) -> None: self._handle_offsets(self._consumer.store_offsets) if not self._enable_auto_commit: self._handle_offsets(self._consumer.commit) + self._clear_records() + + def update_last_delivered_records(self, message: Message): + """Update last delivered records. + + Update a dict with the last record for each partition that has been successfully delivered. + + Parameters + ------- + message : Message + Message to update the last delivered records with. + + """ + self._last_delivered_records[message.partition()] = message + + def _clear_records(self): self._last_valid_records.clear() + self._last_valid_record = None + self._last_delivered_records.clear() def _handle_offsets(self, offset_handler: Callable) -> None: - for message in self._last_valid_records.values(): + for message in self._get_offset_messages(): try: - offset_handler(message=message) + if message: + offset_handler(message=message) except KafkaException as error: raise InputWarning(self, f"{error}, {message}") from error + def _get_offset_messages(self) -> Iterable[Message]: + if self.output_connector.handles_backlog: + return self._last_valid_records.values() + return self._last_delivered_records.values() + def _assign_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: offset, partition = topic_partition.offset, topic_partition.partition @@ -471,8 +526,9 @@ def _revoke_callback(self, consumer, topic_partitions): f"topic: {topic_partition.topic} | " f"partition {topic_partition.partition}" ) - self.output_connector._write_backlog() - self.batch_finished_callback() + if hasattr(self.output_connector, "_write_backlog"): + self.output_connector._write_backlog() + self.batch_finished_callback() def _lost_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 1b950ab01..053735ea3 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -24,18 +24,18 @@ request.required.acks: -1 queue.buffering.max.ms: 0.5 """ - import json from datetime import datetime from functools import cached_property, partial +from logging import Logger from socket import getfqdn from typing import Optional from attrs import define, field, validators -from confluent_kafka import KafkaException, Producer +from confluent_kafka import KafkaException, Producer, Message from logprep.abc.output import CriticalOutputError, FatalOutputError, Output -from logprep.metrics.metrics import GaugeMetric, Metric +from logprep.metrics.metrics import GaugeMetric, Metric, CounterMetric from logprep.util.validators import keys_in_validator DEFAULTS = { @@ -138,6 +138,13 @@ class Metrics(Output.Metrics): ) """Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers""" + number_of_successfully_delivered_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were successfully delivered to Kafka", + name="number_of_successfully_delivered_events", + ) + ) + """Number of events that were successfully delivered to Kafka""" @define(kw_only=True, slots=False) class Config(Output.Config): @@ -147,6 +154,8 @@ class Config(Output.Config): error_topic: str flush_timeout: float send_timeout: int = field(validator=validators.instance_of(int), default=0) + delivered_callback_frequency: int = field(validator=validators.instance_of(int), default=1) + """ (Optional) count of delivered messages before batch_finished_callback is called.""" kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), @@ -171,6 +180,11 @@ class Config(Output.Config): """ + def __init__(self, name: str, configuration: "ConfluentKafkaOutput.Config", logger: Logger): + super().__init__(name, configuration, logger) + self.handles_backlog = False + self._delivered_cnt = 0 + @cached_property def _producer(self): injected_config = { @@ -233,25 +247,20 @@ def describe(self) -> str: return f"{base_description} - Kafka Output: {self._config.kafka_config.get('bootstrap.servers')}" def store(self, document: dict) -> Optional[bool]: - """Store a document in the producer topic. + """Store a document in the configured producer topic. Parameters ---------- document : dict Document to store. - Returns - ------- - Returns True to inform the pipeline to call the batch_finished_callback method in the - configured input """ - self.store_custom(document, self._config.topic) - if self.input_connector: - self.input_connector.batch_finished_callback() + self.metrics.number_of_processed_events += 1 + self._send_to_kafka(document, self._config.topic) @Metric.measure_time() def store_custom(self, document: dict, target: str) -> None: - """Write document to Kafka into target topic. + """Store a document in the target topic. Parameters ---------- @@ -259,6 +268,25 @@ def store_custom(self, document: dict, target: str) -> None: Document to be stored in target topic. target : str Topic to store document in. + + """ + self._send_to_kafka(document, target, set_offsets=False) + + def _send_to_kafka(self, document, target, set_offsets=True): + """Send document to target Kafka topic. + + Documents are sent asynchronously via "produce". + A callback method is used to set the offset for each message once it has been + successfully delivered. + The callback specified for "produce" is called for each document that has been delivered to + Kafka whenever the "poll" method is called (directly or by "flush"). + + Parameters + ---------- + document : dict + Document to be sent to target topic. + target : str + Topic to send document to. Raises ------ CriticalOutputError @@ -266,9 +294,13 @@ def store_custom(self, document: dict, target: str) -> None: """ try: - self._producer.produce(target, value=self._encoder.encode(document)) + callback = self.delivered_callback() if set_offsets else None + self._producer.produce( + target, + value=self._encoder.encode(document), + on_delivery=callback, + ) self._producer.poll(self._config.send_timeout) - self.metrics.number_of_processed_events += 1 except BufferError: # block program until buffer is empty self._producer.flush(timeout=self._config.flush_timeout) @@ -283,6 +315,12 @@ def store_failed( ) -> None: """Write errors into error topic for documents that failed processing. + Documents are sent asynchronously via "produce". + A callback method is used to set the offset for each message once it has been + successfully delivered. + The callback specified for "produce" is called for each document that has been delivered to + Kafka whenever the "poll" method is called (directly or by "flush"). + Parameters ---------- error_message : str @@ -304,12 +342,57 @@ def store_failed( self._producer.produce( self._config.error_topic, value=json.dumps(value, separators=(",", ":")).encode("utf-8"), + on_delivery=self.delivered_callback(), ) self._producer.poll(self._config.send_timeout) except BufferError: # block program until buffer is empty self._producer.flush(timeout=self._config.flush_timeout) + def delivered_callback(self): + """Callback that can called when a single message has been successfully delivered. + + The callback is called asynchronously, therefore the current message is stored within a + closure. + This message is required to later set the offset. + + Returns + ------- + store_offsets_on_success + This is the callback method that the Kafka producer calls. It has access to "message". + + """ + message = self._get_current_message() + + def store_offsets_on_success(error, _): + """Set offset via message stored in closure if no error occurred. + + "delivered_callback_frequency" can be configured to prevent setting the callback for + every single message that was successfully delivered. + Setting this higher than 1 might be useful if auto-commit is disabled. + + """ + if error: + raise FatalOutputError(output=self, message=error) + if message: + self._delivered_cnt += 1 + self.metrics.number_of_successfully_delivered_events += 1 + self.input_connector.update_last_delivered_records(message) + if self._delivered_cnt >= self._config.delivered_callback_frequency: + self.input_connector.batch_finished_callback() + self._delivered_cnt = 0 + + return store_offsets_on_success + + def _get_current_message(self) -> Optional[Message]: + if hasattr(self.input_connector, "last_valid_record"): + return self.input_connector.last_valid_record + return None + + @Metric.measure_time() + def _write_backlog(self): + self._producer.flush(self._config.flush_timeout) + def setup(self): super().setup() try: diff --git a/tests/unit/connector/base.py b/tests/unit/connector/base.py index 9e5c59d6e..b2de2ada8 100644 --- a/tests/unit/connector/base.py +++ b/tests/unit/connector/base.py @@ -555,4 +555,5 @@ def test_store_failed_counts_failed_events(self): def test_store_calls_batch_finished_callback(self): self.object.input_connector = mock.MagicMock() self.object.store({"message": "my event message"}) - self.object.input_connector.batch_finished_callback.assert_called() + if self.object.handles_backlog: + self.object.input_connector.batch_finished_callback.assert_called() diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index f777c7bbf..270a4a10b 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -110,6 +110,7 @@ def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, sett kafka_consumer = kafka_input._consumer message = "test message" kafka_input._last_valid_records = {0: message} + kafka_input.output_connector = mock.MagicMock() kafka_input.batch_finished_callback() if handlers is None: assert kafka_consumer.commit.call_count == 0 @@ -141,6 +142,7 @@ def raise_generator(return_sequence): getattr(kafka_consumer, handler).side_effect = raise_generator(return_sequence) kafka_input._last_valid_records = {0: "message"} + kafka_input.output_connector = mock.MagicMock() with pytest.raises(InputWarning): kafka_input.batch_finished_callback() @@ -152,6 +154,7 @@ def test_get_next_raises_critical_input_error_if_not_a_dict(self, _): self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = '[{"element":"in list"}]'.encode("utf8") + self.object.output_connector = mock.MagicMock() with pytest.raises(CriticalInputError, match=r"not a dict"): self.object.get_next(1) @@ -163,6 +166,7 @@ def test_get_next_raises_critical_input_error_if_unvalid_json(self, _): self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = "I'm not valid json".encode("utf8") + self.object.output_connector = mock.MagicMock() with pytest.raises(CriticalInputError, match=r"not a valid json"): self.object.get_next(1) @@ -174,6 +178,7 @@ def test_get_event_returns_event_and_raw_event(self, _): self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = '{"element":"in list"}'.encode("utf8") + self.object.output_connector = mock.MagicMock() event, raw_event = self.object._get_event(0.001) assert event == {"element": "in list"} assert raw_event == '{"element":"in list"}'.encode("utf8") @@ -187,6 +192,7 @@ def test_get_raw_event_is_callable(self, _): # pylint: disable=arguments-differ self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = '{"element":"in list"}'.encode("utf8") + self.object.output_connector = mock.MagicMock() result = self.object._get_raw_event(0.001) assert result diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index 5af0f0c3f..a5a6a456c 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -50,6 +50,7 @@ class TestConfluentKafkaOutput(BaseOutputTestCase, CommonConfluentKafkaTestCase) "logprep_number_of_failed_events", "logprep_number_of_warnings", "logprep_number_of_errors", + "logprep_number_of_successfully_delivered_events", ] @mock.patch("logprep.connector.confluent_kafka.output.Producer", return_value="The Producer") @@ -59,10 +60,13 @@ def test_producer_property_instanciates_kafka_producer(self, _): @mock.patch("logprep.connector.confluent_kafka.output.Producer") def test_store_sends_event_to_expected_topic(self, _): + self.object.delivered_callback = mock.MagicMock() kafka_producer = self.object._producer event = {"field": "content"} event_raw = json.dumps(event, separators=(",", ":")).encode("utf-8") - expected_call = mock.call(self.CONFIG.get("topic"), value=event_raw) + expected_call = mock.call( + self.CONFIG.get("topic"), value=event_raw, on_delivery=self.object.delivered_callback() + ) self.object.store(event) kafka_producer.produce.assert_called() assert expected_call in kafka_producer.produce.mock_calls @@ -72,7 +76,7 @@ def test_store_custom_sends_event_to_expected_topic(self, _): kafka_producer = self.object._producer event = {"field": "content"} event_raw = json.dumps(event, separators=(",", ":")).encode("utf-8") - expected_call = mock.call(self.CONFIG.get("topic"), value=event_raw) + expected_call = mock.call(self.CONFIG.get("topic"), value=event_raw, on_delivery=None) self.object.store_custom(event, self.CONFIG.get("topic")) kafka_producer.produce.assert_called() assert expected_call in kafka_producer.produce.mock_calls @@ -142,11 +146,38 @@ def test_store_counts_processed_events(self, _): # pylint: disable=arguments-di assert self.object.metrics.number_of_processed_events == 1 @mock.patch("logprep.connector.confluent_kafka.output.Producer") - def test_store_calls_batch_finished_callback(self, _): # pylint: disable=arguments-differ + def test_delivered_callback_calls_with_message_calls_callback( + self, _ + ): # pylint: disable=arguments-differ self.object.input_connector = mock.MagicMock() - self.object.store({"message": "my event message"}) + self.object.input_connector.last_valid_record = mock.MagicMock() + callback = self.object.delivered_callback() + callback(None, "msg") + self.object.input_connector.update_last_delivered_records.assert_called() self.object.input_connector.batch_finished_callback.assert_called() + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_delivered_callback_calls_without_message_does_not_call_callback( + self, _ + ): # pylint: disable=arguments-differ + self.object.input_connector = mock.MagicMock() + self.object.input_connector.last_valid_record = None + callback = self.object.delivered_callback() + callback(None, "msg") + self.object.input_connector.update_last_delivered_records.assert_not_called() + self.object.input_connector.batch_finished_callback.assert_not_called() + + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_delivered_callback_calls_with_error_does_not_call_callback( + self, _ + ): # pylint: disable=arguments-differ + self.object.input_connector = mock.MagicMock() + callback = self.object.delivered_callback() + with pytest.raises(FatalOutputError, match=r"some_error"): + callback(BaseException("some_error"), "msg") + self.object.input_connector.update_last_delivered_records.assert_not_called() + self.object.input_connector.batch_finished_callback.assert_not_called() + def test_setup_raises_fatal_output_error_on_invalid_config(self): config = {"myconfig": "the config", "bootstrap.servers": "testserver:9092"} self.object._config.kafka_config = config From f85eb68bfd670f8fb6011b6af9b9525eb1f5ad91 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Fri, 23 Feb 2024 10:48:23 +0100 Subject: [PATCH 2/9] Add metadata preprocessor and make kafka set/use metadata for offsets --- logprep/abc/input.py | 36 ++++- logprep/abc/output.py | 4 +- logprep/connector/confluent_kafka/input.py | 131 +++++++++--------- logprep/connector/confluent_kafka/output.py | 38 ++--- tests/unit/connector/base.py | 9 +- .../connector/test_confluent_kafka_input.py | 97 ++++++++++++- .../connector/test_confluent_kafka_output.py | 36 +++-- 7 files changed, 233 insertions(+), 118 deletions(-) diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 706c6ec5d..15b068416 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -112,6 +112,7 @@ class Config(Connector.Config): "log_arrival_time_target_field": Optional[str], "log_arrival_timedelta": Optional[TimeDeltaConfig], "enrich_by_env_variables": Optional[dict], + "input_connector_metadata": Optional[bool], }, ), ], @@ -172,6 +173,8 @@ class Config(Connector.Config): - `enrich_by_env_variables` - If required it is possible to automatically enrich incoming events by environment variables. To activate this preprocessor the fields value has to be a mapping from the target field name (key) to the environment variable name (value). + - `input_connector_metadata` - If set to True, metadata will be added by the input connector + if the connector implements `_add_input_connector_metadata_to_event`. """ _version_information: dict = field( @@ -186,6 +189,11 @@ class Config(Connector.Config): output_connector: Optional["Output"] __slots__ = ["pipeline_index", "output_connector"] + @property + def _add_input_connector_metadata(self): + """Check and return if input connector metadata should be added or not.""" + return bool(self._config.preprocessing.get("input_connector_metadata")) + @property def _add_hmac(self): """Check and return if a hmac should be added or not.""" @@ -283,6 +291,8 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]: self.metrics.number_of_processed_events += 1 if not isinstance(event, dict): raise CriticalInputError(self, "not a dict", event) + if self._add_input_connector_metadata: + event, non_critical_error_msg = self._add_input_connector_metadata_to_event(event) if self._add_hmac: event, non_critical_error_msg = self._add_hmac_to(event, raw_event) if self._add_version_info: @@ -295,8 +305,30 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]: self._add_env_enrichment_to_event(event) return event, non_critical_error_msg - def batch_finished_callback(self): - """Can be called by output connectors after processing a batch of one or more records.""" + def batch_finished_callback(self, metadata: Optional[dict] = None): + """Can be called by output connectors after processing a batch of one or more records. + + Parameters + ---------- + metadata: dict + Metadata that can be passed by outputs. + """ + + def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]: + """Add input connector metadata to the event. + + Does nothing unless implemented by an input connector. + + Parameters + ---------- + event_dict: dict + The event to which the metadata should be added to + + Returns + ------- + event_dict: dict + The original event extended with metadata from the input connector. + """ def _add_env_enrichment_to_event(self, event: dict): """Add the env enrichment information to the event""" diff --git a/logprep/abc/output.py b/logprep/abc/output.py index 24469e4a2..18d771f4c 100644 --- a/logprep/abc/output.py +++ b/logprep/abc/output.py @@ -53,10 +53,9 @@ class Config(Connector.Config): But this output can be called as output for extra_data. """ - __slots__ = {"input_connector", "handles_backlog"} + __slots__ = {"input_connector"} input_connector: Optional[Input] - handles_backlog: bool @property def default(self): @@ -76,7 +75,6 @@ def metric_labels(self) -> dict: def __init__(self, name: str, configuration: "Connector.Config", logger: Logger): super().__init__(name, configuration, logger) self.input_connector = None - self.handles_backlog = True @abstractmethod def store(self, document: dict) -> Optional[bool]: diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 5ed83ae41..019bf540e 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -31,7 +31,7 @@ from functools import cached_property, partial from logging import Logger from socket import getfqdn -from typing import Callable, Optional, Tuple, Union, Iterable +from typing import Callable, Optional, Tuple, Union import msgspec from attrs import define, field, validators @@ -210,6 +210,15 @@ class Config(Input.Config): topic: str = field(validator=validators.instance_of(str)) """The topic from which new log messages will be fetched.""" + use_metadata_for_offsets: bool = field( + validator=validators.instance_of(bool), default=False + ) + """Use metadata to set offsets if this is set to True (default is False). + + This must be set appropriately depending on the output connector for the offsets to be set + correctly. + """ + kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), @@ -236,19 +245,15 @@ class Config(Input.Config): _last_valid_records: dict _last_valid_record: Optional[Message] - _last_delivered_records: dict + _last_delivered_record: Optional[TopicPartition] - __slots__ = [ - "_last_valid_records", - "_last_valid_record", - "_last_delivered_records", - ] + __slots__ = ["_last_valid_records", "_last_valid_record", "_last_delivered_record"] def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None: super().__init__(name, configuration, logger) - self._last_valid_record = None self._last_valid_records = {} - self._last_delivered_records = {} + self._last_valid_record = None + self._last_delivered_record = None @cached_property def _consumer(self) -> Consumer: @@ -394,29 +399,12 @@ def _get_raw_event(self, timeout: float) -> Optional[bytearray]: raise CriticalInputError( self, "A confluent-kafka record contains an error code", kafka_error ) - self._update_last_valid_records(message) + self._last_valid_records[message.partition()] = message + self._last_valid_record = message labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"} self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels) return message.value() - def _update_last_valid_records(self, message: Message): - """Update last valid records. - - It updates a dict with multiple partitions if the output connector handles its own backlog. - It stores just the last message if the output connector does not handle its own backlog and - just needs the last valid record. - - Parameters - ------- - message : Message - Message to update the last valid records with. - - """ - if self.output_connector.handles_backlog: - self._last_valid_records[message.partition()] = message - else: - self._last_valid_record = message - def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dict]]: """Parse the raw document from Kafka into a json. @@ -448,6 +436,19 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic ) from error return event_dict, raw_event + def _add_input_connector_metadata_to_event(self, event_dict) -> Tuple[dict, Optional[str]]: + if "_metadata" in event_dict: + non_critical_error_msg = ( + "Couldn't add metadata to the input event as the field '_metadata' already exist." + ) + return event_dict, non_critical_error_msg + + event_dict["_metadata"] = { + "last_partition": self._last_valid_record.partition(), + "last_offset": self._last_valid_record.offset(), + } + return event_dict, None + @property def _enable_auto_offset_store(self) -> bool: return self._config.kafka_config.get("enable.auto.offset.store") == "true" @@ -456,52 +457,50 @@ def _enable_auto_offset_store(self) -> bool: def _enable_auto_commit(self) -> bool: return self._config.kafka_config.get("enable.auto.commit") == "true" - @property - def last_valid_record(self) -> Optional[dict]: - return self._last_valid_record + def _get_delivered_partition_offset(self, metadata: dict) -> TopicPartition: + if metadata is None: + raise FatalInputError(self, "Metadata for setting offsets can't be 'None'") + try: + last_partition = metadata["last_partition"] + last_offset = metadata["last_offset"] + except KeyError as error: + raise FatalInputError( + self, + "Missing fields in metadata for setting offsets: " + "'last_partition' and 'last_offset' required", + ) from error + return TopicPartition( + self._config.topic, + partition=last_partition, + offset=last_offset, + ) - def batch_finished_callback(self) -> None: + def batch_finished_callback(self, metadata: Optional[dict] = None) -> None: """Store offsets for each kafka partition in `self._last_valid_records` - and if configured commit them. Should be called by output connectors if - they are finished processing a batch of records. + or instead use `metadata` to obtain offsets. If configured commit them. + Should be called by output connectors if they are finished processing a batch of records. """ if self._enable_auto_offset_store: return - self._handle_offsets(self._consumer.store_offsets) + self._handle_offsets(self._consumer.store_offsets, metadata) if not self._enable_auto_commit: - self._handle_offsets(self._consumer.commit) - self._clear_records() - - def update_last_delivered_records(self, message: Message): - """Update last delivered records. - - Update a dict with the last record for each partition that has been successfully delivered. - - Parameters - ------- - message : Message - Message to update the last delivered records with. - - """ - self._last_delivered_records[message.partition()] = message - - def _clear_records(self): + self._handle_offsets(self._consumer.commit, metadata) self._last_valid_records.clear() - self._last_valid_record = None - self._last_delivered_records.clear() - def _handle_offsets(self, offset_handler: Callable) -> None: - for message in self._get_offset_messages(): + def _handle_offsets(self, offset_handler: Callable, metadata: Optional[dict]) -> None: + if self._config.use_metadata_for_offsets: + delivered_offset = self._get_delivered_partition_offset(metadata) try: - if message: - offset_handler(message=message) + offset_handler(offsets=[delivered_offset]) except KafkaException as error: - raise InputWarning(self, f"{error}, {message}") from error - - def _get_offset_messages(self) -> Iterable[Message]: - if self.output_connector.handles_backlog: - return self._last_valid_records.values() - return self._last_delivered_records.values() + raise InputWarning(self, f"{error}, {delivered_offset}") from error + else: + records = self._last_valid_records.values() + for record in records: + try: + offset_handler(message=record) + except KafkaException as error: + raise InputWarning(self, f"{error}, {record}") from error def _assign_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: @@ -526,8 +525,8 @@ def _revoke_callback(self, consumer, topic_partitions): f"topic: {topic_partition.topic} | " f"partition {topic_partition.partition}" ) - if hasattr(self.output_connector, "_write_backlog"): - self.output_connector._write_backlog() + self.output_connector._write_backlog() + if not self._config.use_metadata_for_offsets: self.batch_finished_callback() def _lost_callback(self, consumer, topic_partitions): diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 053735ea3..b21c6ccaf 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -27,12 +27,11 @@ import json from datetime import datetime from functools import cached_property, partial -from logging import Logger from socket import getfqdn from typing import Optional from attrs import define, field, validators -from confluent_kafka import KafkaException, Producer, Message +from confluent_kafka import KafkaException, Producer from logprep.abc.output import CriticalOutputError, FatalOutputError, Output from logprep.metrics.metrics import GaugeMetric, Metric, CounterMetric @@ -154,8 +153,6 @@ class Config(Output.Config): error_topic: str flush_timeout: float send_timeout: int = field(validator=validators.instance_of(int), default=0) - delivered_callback_frequency: int = field(validator=validators.instance_of(int), default=1) - """ (Optional) count of delivered messages before batch_finished_callback is called.""" kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), @@ -180,11 +177,6 @@ class Config(Output.Config): """ - def __init__(self, name: str, configuration: "ConfluentKafkaOutput.Config", logger: Logger): - super().__init__(name, configuration, logger) - self.handles_backlog = False - self._delivered_cnt = 0 - @cached_property def _producer(self): injected_config = { @@ -294,7 +286,7 @@ def _send_to_kafka(self, document, target, set_offsets=True): """ try: - callback = self.delivered_callback() if set_offsets else None + callback = self.delivered_callback(document) if set_offsets else None self._producer.produce( target, value=self._encoder.encode(document), @@ -342,14 +334,14 @@ def store_failed( self._producer.produce( self._config.error_topic, value=json.dumps(value, separators=(",", ":")).encode("utf-8"), - on_delivery=self.delivered_callback(), + on_delivery=self.delivered_callback(document_processed), ) self._producer.poll(self._config.send_timeout) except BufferError: # block program until buffer is empty self._producer.flush(timeout=self._config.flush_timeout) - def delivered_callback(self): + def delivered_callback(self, document): """Callback that can called when a single message has been successfully delivered. The callback is called asynchronously, therefore the current message is stored within a @@ -362,7 +354,13 @@ def delivered_callback(self): This is the callback method that the Kafka producer calls. It has access to "message". """ - message = self._get_current_message() + try: + partition_offset = { + "last_partition": document["_metadata"]["last_partition"], + "last_offset": document["_metadata"]["last_offset"], + } + except (TypeError, KeyError): + return lambda *args: None def store_offsets_on_success(error, _): """Set offset via message stored in closure if no error occurred. @@ -374,21 +372,11 @@ def store_offsets_on_success(error, _): """ if error: raise FatalOutputError(output=self, message=error) - if message: - self._delivered_cnt += 1 - self.metrics.number_of_successfully_delivered_events += 1 - self.input_connector.update_last_delivered_records(message) - if self._delivered_cnt >= self._config.delivered_callback_frequency: - self.input_connector.batch_finished_callback() - self._delivered_cnt = 0 + self.metrics.number_of_successfully_delivered_events += 1 + self.input_connector.batch_finished_callback(metadata=partition_offset) return store_offsets_on_success - def _get_current_message(self) -> Optional[Message]: - if hasattr(self.input_connector, "last_valid_record"): - return self.input_connector.last_valid_record - return None - @Metric.measure_time() def _write_backlog(self): self._producer.flush(self._config.flush_timeout) diff --git a/tests/unit/connector/base.py b/tests/unit/connector/base.py index b2de2ada8..4a39ddb7d 100644 --- a/tests/unit/connector/base.py +++ b/tests/unit/connector/base.py @@ -537,6 +537,12 @@ def test_get_next_has_time_measurement(self): # asserts entering context manager in metrics.metrics.Metric.measure_time mock_metric.assert_has_calls([mock.call.tracker.labels().time().__enter__()]) + def test_add_input_connector_metadata_returns_true_if_input_connector_metadata_set(self): + connector_config = deepcopy(self.CONFIG) + connector_config.update({"preprocessing": {"input_connector_metadata": True}}) + connector = Factory.create({"test connector": connector_config}, logger=self.logger) + assert connector._add_input_connector_metadata is True + class BaseOutputTestCase(BaseConnectorTestCase): def test_is_output_instance(self): @@ -555,5 +561,4 @@ def test_store_failed_counts_failed_events(self): def test_store_calls_batch_finished_callback(self): self.object.input_connector = mock.MagicMock() self.object.store({"message": "my event message"}) - if self.object.handles_backlog: - self.object.input_connector.batch_finished_callback.assert_called() + self.object.input_connector.batch_finished_callback.assert_called() diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 270a4a10b..e3e9fdf12 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -8,7 +8,7 @@ from unittest import mock import pytest -from confluent_kafka import OFFSET_BEGINNING, KafkaException +from confluent_kafka import OFFSET_BEGINNING, KafkaException, TopicPartition from logprep.abc.input import ( CriticalInputError, @@ -170,6 +170,44 @@ def test_get_next_raises_critical_input_error_if_unvalid_json(self, _): with pytest.raises(CriticalInputError, match=r"not a valid json"): self.object.get_next(1) + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_get_next_adds_metadata_if_configured(self, _): + input_config = deepcopy(self.CONFIG) + input_config["preprocessing"] = {"input_connector_metadata": True} + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + kafka_input.setup() + mock_record = mock.MagicMock() + mock_record.error = mock.MagicMock() + mock_record.error.return_value = None + kafka_input._consumer.poll = mock.MagicMock(return_value=mock_record) + mock_record.value = mock.MagicMock() + mock_record.value.return_value = '{"foo":"bar"}'.encode("utf8") + event, warning = kafka_input.get_next(1) + assert warning is None + assert event.get("_metadata", {}).get("last_partition") + assert event.get("_metadata", {}).get("last_offset") + del event["_metadata"] + assert event == {"foo": "bar"} + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_get_next_returns_warning_if_metadata_configured_but_field_exists(self, _): + input_config = deepcopy(self.CONFIG) + input_config["preprocessing"] = {"input_connector_metadata": True} + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + kafka_input.setup() + mock_record = mock.MagicMock() + mock_record.error = mock.MagicMock() + mock_record.error.return_value = None + kafka_input._consumer.poll = mock.MagicMock(return_value=mock_record) + mock_record.value = mock.MagicMock() + mock_record.value.return_value = '{"_metadata":"foo"}'.encode("utf8") + event, warning = kafka_input.get_next(1) + assert ( + warning + == "Couldn't add metadata to the input event as the field '_metadata' already exist." + ) + assert event == {"_metadata": "foo"} + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_get_event_returns_event_and_raw_event(self, _): mock_record = mock.MagicMock() @@ -346,12 +384,57 @@ def test_revoke_callback_logs_warning_and_counts(self, mock_consumer): assert self.object.metrics.number_of_warnings == 1 @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback( + def test_revoke_callback_writes_output_backlog_and_does_not_call_batch_finished_callback_if_metadata( self, mock_consumer ): - self.object.output_connector = mock.MagicMock() - self.object.batch_finished_callback = mock.MagicMock() + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input.batch_finished_callback = mock.MagicMock() mock_partitions = [mock.MagicMock()] - self.object._revoke_callback(mock_consumer, mock_partitions) - self.object.output_connector._write_backlog.assert_called() - self.object.batch_finished_callback.assert_called() + kafka_input._revoke_callback(mock_consumer, mock_partitions) + kafka_input.output_connector._write_backlog.assert_called() + assert not kafka_input.batch_finished_callback.called + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback_if_not_metadata( + self, mock_consumer + ): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = False + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input.batch_finished_callback = mock.MagicMock() + mock_partitions = [mock.MagicMock()] + kafka_input._revoke_callback(mock_consumer, mock_partitions) + kafka_input.output_connector._write_backlog.assert_called() + kafka_input.batch_finished_callback.assert_called() + + def test_get_delivered_partition_offset_without_metadata_raises_exception(self): + with pytest.raises(FatalInputError, match="Metadata for setting offsets can't be 'None'"): + self.object._get_delivered_partition_offset(None) + + @pytest.mark.parametrize( + "metadata", + [{}, {"last_offset": 0}, {"last_partition": 0}], + ) + def test_get_delivered_partition_offset_with_missing_metadata_field_raises_exception( + self, metadata + ): + with pytest.raises( + FatalInputError, + match="Missing fields in metadata for setting offsets: " + "'last_partition' and 'last_offset' required", + ): + self.object._get_delivered_partition_offset(metadata) + + def test_get_delivered_partition_offset_with_metadata_returns_topic_partition(self): + topic_partition = self.object._get_delivered_partition_offset( + {"last_partition": 0, "last_offset": 1} + ) + assert isinstance(topic_partition, TopicPartition) + assert topic_partition.partition == 0 + assert topic_partition.offset == 1 diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index a5a6a456c..a08a7939f 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -146,36 +146,43 @@ def test_store_counts_processed_events(self, _): # pylint: disable=arguments-di assert self.object.metrics.number_of_processed_events == 1 @mock.patch("logprep.connector.confluent_kafka.output.Producer") - def test_delivered_callback_calls_with_message_calls_callback( + def test_delivered_callback_with_metadata_calls_callback( self, _ ): # pylint: disable=arguments-differ self.object.input_connector = mock.MagicMock() - self.object.input_connector.last_valid_record = mock.MagicMock() - callback = self.object.delivered_callback() + self.object.input_connector._last_delivered_record = mock.MagicMock() + metadata = {"_metadata": {"last_partition": 0, "last_offset": 0}} + callback = self.object.delivered_callback(metadata) callback(None, "msg") - self.object.input_connector.update_last_delivered_records.assert_called() self.object.input_connector.batch_finished_callback.assert_called() + @pytest.mark.parametrize( + "metadata", + [ + {"_metadata": {"last_partition": 0}}, + {"_metadata": {"last_offset": 0}}, + {"_metadata": {}}, + {}, + ], + ) @mock.patch("logprep.connector.confluent_kafka.output.Producer") - def test_delivered_callback_calls_without_message_does_not_call_callback( - self, _ + def test_delivered_callback_without_metadata_doesnt_call_batch_finished_callback( + self, _, metadata ): # pylint: disable=arguments-differ self.object.input_connector = mock.MagicMock() - self.object.input_connector.last_valid_record = None - callback = self.object.delivered_callback() + callback = self.object.delivered_callback(metadata) callback(None, "msg") - self.object.input_connector.update_last_delivered_records.assert_not_called() - self.object.input_connector.batch_finished_callback.assert_not_called() + assert not self.object.input_connector.batch_finished_callback.called @mock.patch("logprep.connector.confluent_kafka.output.Producer") - def test_delivered_callback_calls_with_error_does_not_call_callback( + def test_delivered_callback_calls_with_error_doesnt_call_batch_finished_callback( self, _ ): # pylint: disable=arguments-differ self.object.input_connector = mock.MagicMock() - callback = self.object.delivered_callback() + metadata = {"_metadata": {"last_partition": 0, "last_offset": 0}} + callback = self.object.delivered_callback(metadata) with pytest.raises(FatalOutputError, match=r"some_error"): callback(BaseException("some_error"), "msg") - self.object.input_connector.update_last_delivered_records.assert_not_called() self.object.input_connector.batch_finished_callback.assert_not_called() def test_setup_raises_fatal_output_error_on_invalid_config(self): @@ -190,3 +197,6 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self): expected_error_message = r"keys are missing: {'bootstrap.servers'}" with pytest.raises(InvalidConfigurationError, match=expected_error_message): Factory.create({"test": config}, logger=self.logger) + + def test_store_calls_batch_finished_callback(self): + """Skipped from superclass""" From 7afc92e8c97d9d9e83103cd67db2b164d0f41461 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Mon, 26 Feb 2024 12:23:49 +0100 Subject: [PATCH 3/9] Fix metadata for existing offset fields --- logprep/connector/confluent_kafka/input.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 019bf540e..6bb60b9dc 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -245,15 +245,13 @@ class Config(Input.Config): _last_valid_records: dict _last_valid_record: Optional[Message] - _last_delivered_record: Optional[TopicPartition] - __slots__ = ["_last_valid_records", "_last_valid_record", "_last_delivered_record"] + __slots__ = ["_last_valid_records", "_last_valid_record"] def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None: super().__init__(name, configuration, logger) self._last_valid_records = {} self._last_valid_record = None - self._last_delivered_record = None @cached_property def _consumer(self) -> Consumer: @@ -437,7 +435,14 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic return event_dict, raw_event def _add_input_connector_metadata_to_event(self, event_dict) -> Tuple[dict, Optional[str]]: - if "_metadata" in event_dict: + metadata = event_dict.get("_metadata", {}) + for meta_field in ("last_partition", "last_offset"): + try: + del event_dict["_metadata"][meta_field] + except (TypeError, KeyError): + pass + + if metadata: non_critical_error_msg = ( "Couldn't add metadata to the input event as the field '_metadata' already exist." ) From 68e5b6dad54d86cb26d906947994deb5b7c4c9e7 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Mon, 26 Feb 2024 12:32:53 +0100 Subject: [PATCH 4/9] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f38cc51a8..d6ede1f59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ * make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0 * add option to Opensearch Output Connector to use parallel bulk implementation (default is True) * make confluent kafka output store offsets only for successfully delivered events - +* add `input_connector_metadata` preprocessor that allows input connectors to add a `_metadata` field to events. ### Improvements From 53701582b84e42d919913a2a28e19720d44cfb03 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Fri, 1 Mar 2024 07:15:59 +0100 Subject: [PATCH 5/9] Add documentation for kafka output parameters --- logprep/connector/confluent_kafka/output.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index b21c6ccaf..2ab587178 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -150,9 +150,13 @@ class Config(Output.Config): """Confluent Kafka Output Config""" topic: str = field(validator=validators.instance_of(str)) + """The topic to which processed messages will be sent.""" error_topic: str + """The topic to which error messages will be sent.""" flush_timeout: float + """Timeout for sending all messages from the producer queue to kafka.""" send_timeout: int = field(validator=validators.instance_of(int), default=0) + """Timeout for sending messages to kafka. Values above 0 make it blocking.""" kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), From 90246affde6ef7e43e788ffe133011ac93da9be9 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Fri, 1 Mar 2024 07:17:11 +0100 Subject: [PATCH 6/9] Ensure _metadata in kafka input is not None --- logprep/connector/confluent_kafka/input.py | 3 +-- tests/unit/connector/test_confluent_kafka_input.py | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 6bb60b9dc..e6d95e588 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -463,8 +463,6 @@ def _enable_auto_commit(self) -> bool: return self._config.kafka_config.get("enable.auto.commit") == "true" def _get_delivered_partition_offset(self, metadata: dict) -> TopicPartition: - if metadata is None: - raise FatalInputError(self, "Metadata for setting offsets can't be 'None'") try: last_partition = metadata["last_partition"] last_offset = metadata["last_offset"] @@ -485,6 +483,7 @@ def batch_finished_callback(self, metadata: Optional[dict] = None) -> None: or instead use `metadata` to obtain offsets. If configured commit them. Should be called by output connectors if they are finished processing a batch of records. """ + metadata = {} if metadata is None else metadata if self._enable_auto_offset_store: return self._handle_offsets(self._consumer.store_offsets, metadata) diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index e3e9fdf12..4868d07a4 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -413,10 +413,6 @@ def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback kafka_input.output_connector._write_backlog.assert_called() kafka_input.batch_finished_callback.assert_called() - def test_get_delivered_partition_offset_without_metadata_raises_exception(self): - with pytest.raises(FatalInputError, match="Metadata for setting offsets can't be 'None'"): - self.object._get_delivered_partition_offset(None) - @pytest.mark.parametrize( "metadata", [{}, {"last_offset": 0}, {"last_partition": 0}], From 4e60bbfe780349dbef17cfdf4831deb1331d3589 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Fri, 1 Mar 2024 07:18:24 +0100 Subject: [PATCH 7/9] Refactor kafka input --- logprep/connector/confluent_kafka/input.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index e6d95e588..9058e3fb4 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -434,11 +434,11 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic ) from error return event_dict, raw_event - def _add_input_connector_metadata_to_event(self, event_dict) -> Tuple[dict, Optional[str]]: - metadata = event_dict.get("_metadata", {}) + def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]: + metadata = event.get("_metadata", {}) for meta_field in ("last_partition", "last_offset"): try: - del event_dict["_metadata"][meta_field] + del event["_metadata"][meta_field] except (TypeError, KeyError): pass @@ -446,13 +446,13 @@ def _add_input_connector_metadata_to_event(self, event_dict) -> Tuple[dict, Opti non_critical_error_msg = ( "Couldn't add metadata to the input event as the field '_metadata' already exist." ) - return event_dict, non_critical_error_msg + return event, non_critical_error_msg - event_dict["_metadata"] = { + event["_metadata"] = { "last_partition": self._last_valid_record.partition(), "last_offset": self._last_valid_record.offset(), } - return event_dict, None + return event, None @property def _enable_auto_offset_store(self) -> bool: From 9ef29c49b8645c559833ed375c1d59b2fcc34d34 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Fri, 1 Mar 2024 07:19:11 +0100 Subject: [PATCH 8/9] Refactor preparing _metadata in kafka input --- logprep/connector/confluent_kafka/input.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 9058e3fb4..c5e2f3866 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -435,12 +435,17 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic return event_dict, raw_event def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]: + """Add last_partition and last_offset to _metadata. + + Pop previous last_partition and last_offset to ensure no incorrect values are set. + Try for AttributeError, since _metadata could already exist, but not be a dict. + """ metadata = event.get("_metadata", {}) - for meta_field in ("last_partition", "last_offset"): - try: - del event["_metadata"][meta_field] - except (TypeError, KeyError): - pass + try: + metadata.pop("last_partition", None) + metadata.pop("last_offset", None) + except AttributeError: + pass if metadata: non_critical_error_msg = ( From 3ec4e419608f6be295ad150b34b86ad5d653adbf Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Fri, 21 Jun 2024 14:51:22 +0200 Subject: [PATCH 9/9] Add fixes, more tests and add scheduled flush to kafka output --- CHANGELOG.md | 2 +- logprep/connector/confluent_kafka/input.py | 2 +- logprep/connector/confluent_kafka/output.py | 110 ++++++++++++++++-- .../connector/test_confluent_kafka_input.py | 58 ++++++++- .../connector/test_confluent_kafka_output.py | 108 ++++++++++++++++- 5 files changed, 263 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6ede1f59..62077fe6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,8 @@ * add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3 * make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0 * add option to Opensearch Output Connector to use parallel bulk implementation (default is True) -* make confluent kafka output store offsets only for successfully delivered events * add `input_connector_metadata` preprocessor that allows input connectors to add a `_metadata` field to events. +* make confluent kafka output store offsets only for successfully delivered events if configured for that. ### Improvements diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index c5e2f3866..442bfab44 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -480,7 +480,7 @@ def _get_delivered_partition_offset(self, metadata: dict) -> TopicPartition: return TopicPartition( self._config.topic, partition=last_partition, - offset=last_offset, + offset=last_offset if isinstance(last_offset, int) else last_offset[0], ) def batch_finished_callback(self, metadata: Optional[dict] = None) -> None: diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 2ab587178..39fee502d 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -25,10 +25,12 @@ queue.buffering.max.ms: 0.5 """ import json +from collections import defaultdict from datetime import datetime from functools import cached_property, partial +from logging import Logger from socket import getfqdn -from typing import Optional +from typing import Optional, DefaultDict from attrs import define, field, validators from confluent_kafka import KafkaException, Producer @@ -153,10 +155,17 @@ class Config(Output.Config): """The topic to which processed messages will be sent.""" error_topic: str """The topic to which error messages will be sent.""" - flush_timeout: float + producer_flush_timeout: float """Timeout for sending all messages from the producer queue to kafka.""" send_timeout: int = field(validator=validators.instance_of(int), default=0) """Timeout for sending messages to kafka. Values above 0 make it blocking.""" + flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60) + """(Optional) Timeout after :code:`_sent_offset_backlog` is flushed if + :code:`sent_offset_backlog_size` is not reached.""" + fire_and_forget: bool = field(validator=validators.instance_of(bool), default=False) + """If True, offsets will be set after sending messages instead of waiting for delivery.""" + sent_offset_backlog_size: int = field(validator=validators.instance_of(int), default=1) + """ (Optional) count of delivered messages before batch_finished_callback is called.""" kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), @@ -181,6 +190,23 @@ class Config(Output.Config): """ + __slots__ = [ + "_sent_offset_backlog", + "_delivered_offset_backlog", + ] + + _sent_offset_backlog: DefaultDict[str, list] + _delivered_offset_backlog: DefaultDict[str, list] + + def __init__(self, name: str, configuration: "ConfluentKafkaOutput.Config", logger: Logger): + super().__init__(name, configuration, logger) + self._sent_offset_backlog = defaultdict(list) + self._delivered_offset_backlog = defaultdict(list) + + @property + def _sent_offset_backlog_size(self): + return sum(map(len, self._sent_offset_backlog.values())) + @cached_property def _producer(self): injected_config = { @@ -253,6 +279,7 @@ def store(self, document: dict) -> Optional[bool]: """ self.metrics.number_of_processed_events += 1 self._send_to_kafka(document, self._config.topic) + self._add_offset_to_sent_backlog(document) @Metric.measure_time() def store_custom(self, document: dict, target: str) -> None: @@ -290,6 +317,7 @@ def _send_to_kafka(self, document, target, set_offsets=True): """ try: + set_offsets &= not self._config.fire_and_forget callback = self.delivered_callback(document) if set_offsets else None self._producer.produce( target, @@ -299,12 +327,18 @@ def _send_to_kafka(self, document, target, set_offsets=True): self._producer.poll(self._config.send_timeout) except BufferError: # block program until buffer is empty - self._producer.flush(timeout=self._config.flush_timeout) + self._producer.flush(timeout=self._config.producer_flush_timeout) except BaseException as error: raise CriticalOutputError( self, f"Error storing output document -> {error}", document ) from error + if self._config.fire_and_forget: + return + + if self._sent_offset_backlog_size >= self._config.sent_offset_backlog_size: + self._write_backlog() + @Metric.measure_time() def store_failed( self, error_message: str, document_received: dict, document_processed: dict @@ -335,15 +369,58 @@ def store_failed( "timestamp": str(datetime.now()), } try: + callback = ( + self.delivered_callback(document_processed) + if not self._config.fire_and_forget + else None + ) self._producer.produce( self._config.error_topic, value=json.dumps(value, separators=(",", ":")).encode("utf-8"), - on_delivery=self.delivered_callback(document_processed), + on_delivery=callback ) self._producer.poll(self._config.send_timeout) except BufferError: # block program until buffer is empty - self._producer.flush(timeout=self._config.flush_timeout) + self._producer.flush(timeout=self._config.producer_flush_timeout) + self._add_offset_to_sent_backlog(document_received) + + def _add_offset_to_sent_backlog(self, document): + if not self._config.fire_and_forget: + metadata = document.get("_metadata", {}) + partition = metadata.get("last_partition", None) + offset = metadata.get("last_offset", None) + if not (partition is None and offset is None): + self._sent_offset_backlog[partition].append(offset) + + @staticmethod + def _get_last_committable_offsets( + sent_offset_backlog: DefaultDict[str, list], + delivered_offset_backlog: DefaultDict[str, list], + ) -> dict: + last_committable = {} + for partition, offsets in delivered_offset_backlog.items(): + if not offsets: + continue + + if len(offsets) == 1: + last_committable[partition] = offsets[0] + continue + + offsets.sort() + prev_offset = offsets[0] + for offset in offsets[1:]: + if offset > prev_offset + 1: + unexpected_gap = False + for missing_offset in range(prev_offset + 1, offset): + if missing_offset in sent_offset_backlog.get(partition, []): + last_committable[partition] = prev_offset + unexpected_gap = True + if unexpected_gap: + break + last_committable[partition] = offset + prev_offset = offset + return last_committable def delivered_callback(self, document): """Callback that can called when a single message has been successfully delivered. @@ -369,7 +446,7 @@ def delivered_callback(self, document): def store_offsets_on_success(error, _): """Set offset via message stored in closure if no error occurred. - "delivered_callback_frequency" can be configured to prevent setting the callback for + "sent_offset_backlog_size" can be configured to prevent setting the callback for every single message that was successfully delivered. Setting this higher than 1 might be useful if auto-commit is disabled. @@ -377,16 +454,31 @@ def store_offsets_on_success(error, _): if error: raise FatalOutputError(output=self, message=error) self.metrics.number_of_successfully_delivered_events += 1 - self.input_connector.batch_finished_callback(metadata=partition_offset) + last_partition = partition_offset["last_partition"] + last_offset = partition_offset["last_offset"] + self._delivered_offset_backlog[last_partition].append(last_offset) return store_offsets_on_success @Metric.measure_time() def _write_backlog(self): - self._producer.flush(self._config.flush_timeout) + self._producer.flush(self._config.producer_flush_timeout) + if self._config.fire_and_forget: + return + + last_commitable_offsets = self._get_last_committable_offsets( + self._sent_offset_backlog, self._delivered_offset_backlog + ) + for partition, offset in last_commitable_offsets.items(): + committable_offset = {"last_partition": partition, "last_offset": offset} + self.input_connector.batch_finished_callback(metadata=committable_offset) + self._delivered_offset_backlog.clear() + self._sent_offset_backlog.clear() def setup(self): super().setup() + flush_timeout = self._config.flush_timeout + self._schedule_task(task=self._write_backlog, seconds=flush_timeout) try: _ = self._producer except (KafkaException, ValueError) as error: @@ -395,4 +487,4 @@ def setup(self): def shut_down(self) -> None: """ensures that all messages are flushed""" if self._producer is not None: - self._producer.flush(self._config.flush_timeout) + self._write_backlog() diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 4868d07a4..10b9c869e 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -103,8 +103,11 @@ def test_shut_down_calls_consumer_close(self, _): ], ) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, settings, handlers): + def test_batch_finished_callback_calls_offsets_handler_for_setting_without_metadata( + self, _, settings, handlers + ): input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = False kafka_input = Factory.create({"test": input_config}, logger=self.logger) kafka_input._config.kafka_config.update(settings) kafka_consumer = kafka_input._consumer @@ -409,10 +412,63 @@ def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback kafka_input.output_connector = mock.MagicMock() kafka_input.batch_finished_callback = mock.MagicMock() mock_partitions = [mock.MagicMock()] + kafka_input.output_connector._sent_offset_backlog = {} + kafka_input.output_connector._delivered_offset_backlog = {} kafka_input._revoke_callback(mock_consumer, mock_partitions) kafka_input.output_connector._write_backlog.assert_called() kafka_input.batch_finished_callback.assert_called() + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_revoke_callback_writes_output_backlog_and_does_not_call_batch_finished_callback_metadata( + self, mock_consumer + ): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input.batch_finished_callback = mock.MagicMock() + mock_partitions = [mock.MagicMock()] + kafka_input.output_connector._sent_offset_backlog = {0: [0]} + kafka_input.output_connector._delivered_offset_backlog = {0: [0]} + kafka_input._revoke_callback(mock_consumer, mock_partitions) + kafka_input.output_connector._write_backlog.assert_called() + kafka_input.batch_finished_callback.assert_not_called() + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_handle_offsets_uses_delivered_offsets_if_use_metadata(self, _): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + metadata = {"last_partition": 0, "last_offset": 0} + kafka_input._handle_offsets(kafka_input._consumer.store_offsets, metadata) + offsets = [TopicPartition(kafka_input._config.topic, partition=0, offset=0)] + kafka_input._consumer.store_offsets.assert_called_with(offsets=offsets) + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_handle_offsets_raises_Exception_if_use_metadata_but_no_metadata(self, _): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + metadata = {} + with pytest.raises(FatalInputError, match="'last_partition' and 'last_offset' required"): + kafka_input._handle_offsets(kafka_input._consumer.store_offsets, metadata) + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_handle_offsets_uses__last_valid_records_if_not_use_metadata(self, _): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = False + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input._last_valid_records = {0: "MESSAGE_OBJECT"} + kafka_input._handle_offsets(kafka_input._consumer.store_offsets, {}) + kafka_input._consumer.store_offsets.assert_called_with(message="MESSAGE_OBJECT") + @pytest.mark.parametrize( "metadata", [{}, {"last_offset": 0}, {"last_partition": 0}], diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index a08a7939f..52afe6d84 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -27,7 +27,7 @@ class TestConfluentKafkaOutput(BaseOutputTestCase, CommonConfluentKafkaTestCase) "type": "confluentkafka_output", "topic": "test_input_raw", "error_topic": "test_error_topic", - "flush_timeout": 0.1, + "producer_flush_timeout": 30, "kafka_config": { "bootstrap.servers": "testserver:9092", }, @@ -146,15 +146,113 @@ def test_store_counts_processed_events(self, _): # pylint: disable=arguments-di assert self.object.metrics.number_of_processed_events == 1 @mock.patch("logprep.connector.confluent_kafka.output.Producer") - def test_delivered_callback_with_metadata_calls_callback( - self, _ - ): # pylint: disable=arguments-differ + def test_delivered_callback_adds_offset(self, _): # pylint: disable=arguments-differ self.object.input_connector = mock.MagicMock() self.object.input_connector._last_delivered_record = mock.MagicMock() metadata = {"_metadata": {"last_partition": 0, "last_offset": 0}} callback = self.object.delivered_callback(metadata) callback(None, "msg") - self.object.input_connector.batch_finished_callback.assert_called() + assert self.object._delivered_offset_backlog == {0: [0]} + + @pytest.mark.parametrize( + "sent, delivered, callback", + [ + ({"_metadata": {"last_partition": 0}}, {"_metadata": {"last_partition": 0}}, True), + ({"_metadata": {"last_partition": 0}}, None, False), + (None, {"_metadata": {"last_partition": 0}}, False), + (None, None, False), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_send_to_kafka_calls_callback_if_no_fire_and_forget( + self, _, sent, delivered, callback + ): # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = False + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + + kafka_output.input_connector = mock.MagicMock() + doc = {"_metadata": {"last_partition": 0, "last_offset": 0}} + kafka_output._sent_offset_backlog = {0: [sent]} if sent else {} + kafka_output._delivered_offset_backlog = {0: [delivered]} if delivered else {} + kafka_output._send_to_kafka(doc, "foo") + if callback: + kafka_output.input_connector.batch_finished_callback.assert_called() + else: + kafka_output.input_connector.batch_finished_callback.assert_not_called() + + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_send_to_kafka_does_not_call_callback_if_fire_and_forget(self, _): + # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = True + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + + kafka_output.input_connector = mock.MagicMock() + kafka_output._send_to_kafka({"foo": "bar"}, "baz_topic") + kafka_output.input_connector.batch_finished_callback.assert_not_called() + + @pytest.mark.parametrize("fire_and_forget", [True, False]) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_write_backlog_does_not_call_callback_if_fire_and_forget(self, _, fire_and_forget): + # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = fire_and_forget + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + kafka_output._sent_offset_backlog = {0: [0]} + kafka_output._delivered_offset_backlog = kafka_output._sent_offset_backlog + kafka_output.input_connector = mock.MagicMock() + kafka_output._write_backlog() + if fire_and_forget: + kafka_output.input_connector.batch_finished_callback.assert_not_called() + else: + kafka_output.input_connector.batch_finished_callback.assert_called() + + @pytest.mark.parametrize( + "sent_backlog, delivered_backlog, expected_offsets", + [ + ({}, {}, {}), + ({0: [0]}, {}, {}), + ({}, {0: [0]}, {0: 0}), + ({}, {0: []}, {}), + ({0: [0]}, {0: [0]}, {0: 0}), + ({0: [0, 1, 2]}, {0: [0, 1, 2]}, {0: 2}), + ({0: [0, 2]}, {0: [0, 1, 2]}, {0: 2}), + ({0: [0, 1, 2]}, {0: [0, 2]}, {0: 0}), + ({0: [0, 2]}, {0: [0, 2]}, {0: 2}), + ({0: [0], 1: [0]}, {0: [0], 1: [0]}, {0: 0, 1: 0}), + ({0: [0, 1, 2, 3], 1: [0, 1, 2, 3]}, {0: [0, 2, 3], 1: [0, 1, 3]}, {0: 0, 1: 1}), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_get_last_committable_offsets( + self, _, sent_backlog, delivered_backlog, expected_offsets + ): + last_committable_offsets = self.object._get_last_committable_offsets( + sent_backlog, delivered_backlog + ) + assert last_committable_offsets == expected_offsets + + @pytest.mark.parametrize( + "doc, sent_backlog, fire_and_forget", + [ + ({"_metadata": {"last_partition": 0, "last_offset": 0}}, {0: [0]}, False), + ({"foo": "bar"}, {}, False), + ({"_metadata": {"last_partition": 0, "last_offset": 0}}, {}, True), + ({"foo": "bar"}, {}, True), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_do_not_add_offset_to_sent_backlog_if_not_fire_and_forget( + self, _, doc, sent_backlog, fire_and_forget + ): + # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = fire_and_forget + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + + kafka_output._add_offset_to_sent_backlog(doc) + assert kafka_output._sent_offset_backlog == sent_backlog @pytest.mark.parametrize( "metadata",