Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing SSL config to kafka admin clients #710

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ def _kafka_config(self) -> dict:
DEFAULTS.update({"client.id": getfqdn()})
DEFAULTS.update(
{
"group.instance.id": f"{getfqdn().strip('.')}-Pipeline{self.pipeline_index}-pid{os.getpid()}"
"group.instance.id": f"{getfqdn().strip('.')}-"
f"Pipeline{self.pipeline_index}-pid{os.getpid()}"
}
)
return DEFAULTS | self._config.kafka_config | injected_config
Expand All @@ -288,6 +289,9 @@ def _admin(self) -> AdminClient:
confluent_kafka admin client object
"""
admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]}
for key, value in self._config.kafka_config.items():
if key.startswith(("security.", "ssl.")):
admin_config[key] = value
return AdminClient(admin_config)

@cached_property
Expand Down Expand Up @@ -375,7 +379,8 @@ def _commit_callback(
if offset in SPECIAL_OFFSETS:
offset = 0
labels = {
"description": f"topic: {self._config.topic} - partition: {topic_partition.partition}"
"description": f"topic: {self._config.topic} - "
f"partition: {topic_partition.partition}"
}
self.metrics.committed_offsets.add_with_labels(offset, labels)

Expand Down Expand Up @@ -473,7 +478,8 @@ def batch_finished_callback(self) -> None:
"""
if self._enable_auto_offset_store:
return
# in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled
# in case the ConfluentKafkaInput._revoke_callback is triggered before the first message
# was polled
if not self._last_valid_record:
return
try:
Expand Down
8 changes: 7 additions & 1 deletion logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def _admin(self) -> AdminClient:
confluent_kafka admin client object
"""
admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]}
for key, value in self._config.kafka_config.items():
if key.startswith(("security.", "ssl.")):
admin_config[key] = value
return AdminClient(admin_config)

@cached_property
Expand Down Expand Up @@ -266,7 +269,10 @@ def describe(self) -> str:

"""
base_description = super().describe()
return f"{base_description} - Kafka Output: {self._config.kafka_config.get('bootstrap.servers')}"
return (
f"{base_description} - Kafka Output: "
f"{self._config.kafka_config.get('bootstrap.servers')}"
)

def store(self, document: dict) -> Optional[bool]:
"""Store a document in the producer topic.
Expand Down
36 changes: 33 additions & 3 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from unittest import mock

import pytest
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException, Message
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException

from logprep.abc.input import (
CriticalInputError,
Expand Down Expand Up @@ -71,7 +71,8 @@ def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_re
mock_record.error = mock.MagicMock(
return_value=KafkaError(
error=3,
reason="Subscribed topic not available: (Test Instance Name) : Broker: Unknown topic or partition",
reason="Subscribed topic not available: (Test Instance Name) : "
"Broker: Unknown topic or partition",
fatal=False,
retriable=False,
txn_requires_abort=False,
Expand Down Expand Up @@ -109,7 +110,7 @@ def test_batch_finished_callback_calls_store_offsets(self, _):
kafka_consumer.store_offsets.assert_called_with(message=message)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_calls_store_offsets(self, _):
def test_batch_finished_callback_does_not_call_store_offsets(self, _):
input_config = deepcopy(self.CONFIG)
kafka_input = Factory.create({"test": input_config})
kafka_consumer = kafka_input._consumer
Expand Down Expand Up @@ -424,3 +425,32 @@ def test_health_counts_metrics_on_kafka_exception(self):
self.object._consumer.list_topics.side_effect = KafkaException("test error")
assert not self.object.health()
assert self.object.metrics.number_of_errors == 1

@pytest.mark.parametrize(
["kafka_config_update", "expected_admin_client_config"],
[
({}, {"bootstrap.servers": "testserver:9092"}),
({"statistics.foo": "bar"}, {"bootstrap.servers": "testserver:9092"}),
(
{"security.foo": "bar"},
{"bootstrap.servers": "testserver:9092", "security.foo": "bar"},
),
(
{"ssl.foo": "bar"},
{"bootstrap.servers": "testserver:9092", "ssl.foo": "bar"},
),
(
{"security.foo": "bar", "ssl.foo": "bar"},
{"bootstrap.servers": "testserver:9092", "security.foo": "bar", "ssl.foo": "bar"},
),
],
)
@mock.patch("logprep.connector.confluent_kafka.input.AdminClient")
def test_set_security_related_config_in_admin_client(
self, admin_client, kafka_config_update, expected_admin_client_config
):
new_kafka_config = deepcopy(self.CONFIG)
new_kafka_config["kafka_config"].update(kafka_config_update)
input_connector = Factory.create({"input_connector": new_kafka_config})
_ = input_connector._admin
admin_client.assert_called_with(expected_admin_client_config)
30 changes: 29 additions & 1 deletion tests/unit/connector/test_confluent_kafka_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint: disable=wrong-import-position
# pylint: disable=wrong-import-order
# pylint: disable=attribute-defined-outside-init
# pylint: disable=no-self-use

import json
from copy import deepcopy
Expand Down Expand Up @@ -168,3 +167,32 @@ def test_shutdown_logs_and_counts_error_if_queue_not_fully_flushed(self):
def test_health_returns_bool(self):
with mock.patch.object(self.object, "_admin"):
super().test_health_returns_bool()

@pytest.mark.parametrize(
["kafka_config_update", "expected_admin_client_config"],
[
({}, {"bootstrap.servers": "localhost:9092"}),
({"statistics.foo": "bar"}, {"bootstrap.servers": "localhost:9092"}),
(
{"security.foo": "bar"},
{"bootstrap.servers": "localhost:9092", "security.foo": "bar"},
),
(
{"ssl.foo": "bar"},
{"bootstrap.servers": "localhost:9092", "ssl.foo": "bar"},
),
(
{"security.foo": "bar", "ssl.foo": "bar"},
{"bootstrap.servers": "localhost:9092", "security.foo": "bar", "ssl.foo": "bar"},
),
],
)
@mock.patch("logprep.connector.confluent_kafka.output.AdminClient")
def test_set_security_related_config_in_admin_client(
self, admin_client, kafka_config_update, expected_admin_client_config
):
new_kafka_config = deepcopy(self.CONFIG)
new_kafka_config["kafka_config"].update(kafka_config_update)
output_connector = Factory.create({"output_connector": new_kafka_config})
_ = output_connector._admin
admin_client.assert_called_with(expected_admin_client_config)
Loading