diff --git a/rbac/core/kafka.py b/rbac/core/kafka.py index 3f3c2165..5ff03032 100644 --- a/rbac/core/kafka.py +++ b/rbac/core/kafka.py @@ -40,8 +40,10 @@ def get_producer(self): else: if settings.KAFKA_AUTH: self.producer = KafkaProducer(**settings.KAFKA_AUTH) + elif not settings.KAFKA_SERVERS: + raise AttributeError("Empty servers list") else: - self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_SERVER) + self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_SERVERS) return self.producer def send_kafka_message(self, topic, message, headers=None): diff --git a/rbac/rbac/settings.py b/rbac/rbac/settings.py index e869734c..93ca973c 100644 --- a/rbac/rbac/settings.py +++ b/rbac/rbac/settings.py @@ -394,31 +394,46 @@ NOTIFICATIONS_TOPIC = None # Kafka settings +KAFKA_SERVERS = [] + if KAFKA_ENABLED: KAFKA_AUTH = {} if ENVIRONMENT.bool("CLOWDER_ENABLED", default=False): - kafka_broker = LoadedConfig.kafka.brokers[0] - KAFKA_HOST = kafka_broker.hostname - KAFKA_PORT = kafka_broker.port + kafka_brokers = LoadedConfig.kafka.brokers + broker_index = 0 + if not kafka_brokers: + raise ValueError("No kafka brokers available") + for broker in kafka_brokers: + if broker and broker.hostname != None and broker.port != None: + kafka_host = broker.hostname + kafka_port = broker.port + kafka_info = f"{kafka_host}:{kafka_port}" + KAFKA_SERVERS.append(kafka_info) + + if broker.authtype != None and broker.authtype.value == "sasl": + broker_index = kafka_brokers.index(broker) + else: + raise ValueError("Broker value is none. It does not contain hostname, port, or authtype") try: - if kafka_broker.authtype.value == "sasl": + if kafka_brokers[broker_index].authtype.value == "sasl": KAFKA_AUTH.update( { - "bootstrap_servers": f"{KAFKA_HOST}:{KAFKA_PORT}", - "sasl_plain_username": kafka_broker.sasl.username, - "sasl_plain_password": kafka_broker.sasl.password, - "sasl_mechanism": kafka_broker.sasl.saslMechanism.upper(), - "security_protocol": kafka_broker.sasl.securityProtocol.upper(), + "bootstrap_servers": KAFKA_SERVERS, + "sasl_plain_username": kafka_brokers[broker_index].sasl.username, + "sasl_plain_password": kafka_brokers[broker_index].sasl.password, + "sasl_mechanism": kafka_brokers[broker_index].sasl.saslMechanism.upper(), + "security_protocol": kafka_brokers[broker_index].sasl.securityProtocol.upper(), } ) - if kafka_broker.cacert: + if kafka_brokers[broker_index].cacert: KAFKA_AUTH["ssl_cafile"] = LoadedConfig.kafka_ca() except AttributeError: KAFKA_AUTH = {} else: - KAFKA_HOST = "localhost" - KAFKA_PORT = "9092" - KAFKA_SERVER = f"{KAFKA_HOST}:{KAFKA_PORT}" + kafka_host = "localhost" + kafka_port = "9092" + kafka_info = f"{kafka_host}:{kafka_port}" + KAFKA_SERVERS.append(kafka_info) clowder_notifications_topic = KafkaTopics.get(NOTIFICATIONS_TOPIC) if clowder_notifications_topic: