Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1031 from Ellen-Yi-Dong/update_rbac…
Browse files Browse the repository at this point in the history
…_kafka_brokers

Update rbac.settings in order to support multiple Kafka Brokers
  • Loading branch information
Ellen-Yi-Dong authored Feb 22, 2024
2 parents cc73705 + b3fd67b commit c42dd91
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
4 changes: 3 additions & 1 deletion rbac/core/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ def get_producer(self):
else:
if settings.KAFKA_AUTH:
self.producer = KafkaProducer(**settings.KAFKA_AUTH)
elif not settings.KAFKA_SERVERS:
raise AttributeError("Empty servers list")
else:
self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_SERVER)
self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_SERVERS)
return self.producer

def send_kafka_message(self, topic, message, headers=None):
Expand Down
41 changes: 28 additions & 13 deletions rbac/rbac/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,31 +394,46 @@
NOTIFICATIONS_TOPIC = None

# Kafka settings
KAFKA_SERVERS = []

if KAFKA_ENABLED:
KAFKA_AUTH = {}
if ENVIRONMENT.bool("CLOWDER_ENABLED", default=False):
kafka_broker = LoadedConfig.kafka.brokers[0]
KAFKA_HOST = kafka_broker.hostname
KAFKA_PORT = kafka_broker.port
kafka_brokers = LoadedConfig.kafka.brokers
broker_index = 0
if not kafka_brokers:
raise ValueError("No kafka brokers available")
for broker in kafka_brokers:
if broker and broker.hostname != None and broker.port != None:
kafka_host = broker.hostname
kafka_port = broker.port
kafka_info = f"{kafka_host}:{kafka_port}"
KAFKA_SERVERS.append(kafka_info)

if broker.authtype != None and broker.authtype.value == "sasl":
broker_index = kafka_brokers.index(broker)
else:
raise ValueError("Broker value is none. It does not contain hostname, port, or authtype")
try:
if kafka_broker.authtype.value == "sasl":
if kafka_brokers[broker_index].authtype.value == "sasl":
KAFKA_AUTH.update(
{
"bootstrap_servers": f"{KAFKA_HOST}:{KAFKA_PORT}",
"sasl_plain_username": kafka_broker.sasl.username,
"sasl_plain_password": kafka_broker.sasl.password,
"sasl_mechanism": kafka_broker.sasl.saslMechanism.upper(),
"security_protocol": kafka_broker.sasl.securityProtocol.upper(),
"bootstrap_servers": KAFKA_SERVERS,
"sasl_plain_username": kafka_brokers[broker_index].sasl.username,
"sasl_plain_password": kafka_brokers[broker_index].sasl.password,
"sasl_mechanism": kafka_brokers[broker_index].sasl.saslMechanism.upper(),
"security_protocol": kafka_brokers[broker_index].sasl.securityProtocol.upper(),
}
)
if kafka_broker.cacert:
if kafka_brokers[broker_index].cacert:
KAFKA_AUTH["ssl_cafile"] = LoadedConfig.kafka_ca()
except AttributeError:
KAFKA_AUTH = {}
else:
KAFKA_HOST = "localhost"
KAFKA_PORT = "9092"
KAFKA_SERVER = f"{KAFKA_HOST}:{KAFKA_PORT}"
kafka_host = "localhost"
kafka_port = "9092"
kafka_info = f"{kafka_host}:{kafka_port}"
KAFKA_SERVERS.append(kafka_info)

clowder_notifications_topic = KafkaTopics.get(NOTIFICATIONS_TOPIC)
if clowder_notifications_topic:
Expand Down

0 comments on commit c42dd91

Please sign in to comment.