From 5d357114d07a41002d706078cac7e61f379c9f97 Mon Sep 17 00:00:00 2001 From: Piotr Pauksztelo Date: Thu, 21 Nov 2024 12:07:11 +0100 Subject: [PATCH] Add missing SSL config to kafka admin clients --- logprep/connector/confluent_kafka/input.py | 3 +++ logprep/connector/confluent_kafka/output.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 0ce2340f7..ce3020485 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -288,6 +288,9 @@ def _admin(self) -> AdminClient: confluent_kafka admin client object """ admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]} + for key, value in self._config.kafka_config.items(): + if key.startswith(("security.", "ssl.")): + admin_config[key] = value return AdminClient(admin_config) @cached_property diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 2c02a323a..5d9df8f72 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -212,6 +212,9 @@ def _admin(self) -> AdminClient: confluent_kafka admin client object """ admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]} + for key, value in self._config.kafka_config.items(): + if key.startswith(("security.", "ssl.")): + admin_config[key] = value return AdminClient(admin_config) @cached_property