Skip to content

Commit

Permalink
[COST-4463] MSK: multi-broker support (#4795)
Browse files Browse the repository at this point in the history
  • Loading branch information
maskarb authored Nov 20, 2023
1 parent a990d16 commit ee9bf8f
Show file tree
Hide file tree
Showing 21 changed files with 107 additions and 195 deletions.
1 change: 0 additions & 1 deletion deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4206,7 +4206,6 @@ objects:
kafkaTopics:
- topicName: platform.sources.event-stream
- topicName: platform.upload.announce
- topicName: platform.upload.hccm
- topicName: platform.upload.validation
- topicName: platform.notifications.ingress
- topicName: hccm.ros.events
Expand Down
1 change: 0 additions & 1 deletion deploy/kustomize/base/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ objects:
kafkaTopics:
- topicName: platform.sources.event-stream
- topicName: platform.upload.announce
- topicName: platform.upload.hccm
- topicName: platform.upload.validation
- topicName: platform.notifications.ingress
- topicName: hccm.ros.events
Expand Down
2 changes: 1 addition & 1 deletion docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ If necessary, you can bring up a consumer to see the contents of
messages that are uploaded to the `hccm` topic using the following
command within the ingress environment:

docker compose exec kafka kafka-console-consumer --topic=platform.upload.hccm --bootstrap-server=localhost:29092
docker compose exec kafka kafka-console-consumer --topic=platform.upload.announce --bootstrap-server=localhost:29092

Finally, you can bring up Koku project via `docker compose` and check the
koku-listener logs to ensure the listener has successfully connected and
Expand Down
6 changes: 1 addition & 5 deletions koku/api/status/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ class ConfigSerializer(serializers.Serializer):
initial_ingest_num_months = serializers.IntegerField(source="INITIAL_INGEST_NUM_MONTHS", read_only=True)
ingest_override = serializers.BooleanField(source="INGEST_OVERRIDE", read_only=True)
trino_enabled = serializers.BooleanField(default=True)
insights_kafka_host = serializers.CharField(source="INSIGHTS_KAFKA_HOST", read_only=True)
insights_kafka_port = serializers.IntegerField(source="INSIGHTS_KAFKA_PORT", read_only=True)
insights_kafka_address = serializers.CharField(source="INSIGHTS_KAFKA_ADDRESS", read_only=True)
hccm_topic = serializers.CharField(source="HCCM_TOPIC", read_only=True)
validation_topic = serializers.CharField(source="VALIDATION_TOPIC", read_only=True)

kafka_connect = serializers.BooleanField(source="KAFKA_CONNECT", read_only=True)
retry_seconds = serializers.IntegerField(source="RETRY_SECONDS", read_only=True)
del_record_limit = serializers.IntegerField(source="DEL_RECORD_LIMIT", read_only=True)
Expand Down
9 changes: 3 additions & 6 deletions koku/kafka_utils/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@
from kafka_utils import utils
from masu.prometheus_stats import WORKER_REGISTRY

TEST_HOST = "fake-host"
TEST_PORT = "0000"


class KafkaUtilsTest(TestCase):
"""Test Cases for the Kafka utilities."""

def test_check_kafka_connection(self):
"""Test check kafka connections."""
with patch("kafka.BrokerConnection.connect_blocking", return_value=False):
result = utils.check_kafka_connection(TEST_HOST, TEST_PORT)
result = utils.check_kafka_connection()
self.assertFalse(result)
with patch("kafka.BrokerConnection.connect_blocking", return_value=True):
with patch("kafka.BrokerConnection.close") as mock_close:
result = utils.check_kafka_connection(TEST_HOST, TEST_PORT)
result = utils.check_kafka_connection()
mock_close.assert_called()
self.assertTrue(result)

Expand All @@ -32,7 +29,7 @@ def test_check_kafka_connection(self):
def test_kafka_connection_metrics_listen_for_messages(self, mock_start, mock_sleep):
"""Test check_kafka_connection increments kafka connection errors on KafkaError."""
connection_errors_before = WORKER_REGISTRY.get_sample_value("kafka_connection_errors_total")
utils.is_kafka_connected(TEST_HOST, TEST_PORT)
utils.is_kafka_connected()
connection_errors_after = WORKER_REGISTRY.get_sample_value("kafka_connection_errors_total")
self.assertEqual(connection_errors_after - connection_errors_before, 1)

Expand Down
62 changes: 36 additions & 26 deletions koku/kafka_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@
from confluent_kafka import Producer
from kafka import BrokerConnection

from masu.config import Config
from koku.configurator import CONFIGURATOR
from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER
from masu.util.common import SingletonMeta


LOG = logging.getLogger(__name__)
UPLOAD_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.announce")
VALIDATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.validation")
NOTIFICATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.notifications.ingress")
ROS_TOPIC = CONFIGURATOR.get_kafka_topic("hccm.ros.events")
SUBS_TOPIC = CONFIGURATOR.get_kafka_topic("platform.rhsm-subscriptions.service-instance-ingress")
SOURCES_TOPIC = CONFIGURATOR.get_kafka_topic("platform.sources.event-stream")


class ProducerSingleton(Producer, metaclass=SingletonMeta):
Expand All @@ -28,22 +35,23 @@ def _get_managed_kafka_config(conf=None): # pragma: no cover
if not isinstance(conf, dict):
conf = {}

if Config.INSIGHTS_KAFKA_SASL:
conf["security.protocol"] = Config.INSIGHTS_KAFKA_SASL.securityProtocol
conf["sasl.mechanisms"] = Config.INSIGHTS_KAFKA_SASL.saslMechanism
conf["sasl.username"] = Config.INSIGHTS_KAFKA_SASL.username
conf["sasl.password"] = Config.INSIGHTS_KAFKA_SASL.password
conf["bootstrap.servers"] = ",".join(CONFIGURATOR.get_kafka_broker_list())

if sasl := CONFIGURATOR.get_kafka_sasl():
conf["security.protocol"] = sasl.securityProtocol
conf["sasl.mechanisms"] = sasl.saslMechanism
conf["sasl.username"] = sasl.username
conf["sasl.password"] = sasl.password

if Config.INSIGHTS_KAFKA_CACERT:
conf["ssl.ca.location"] = Config.INSIGHTS_KAFKA_CACERT
if cacert := CONFIGURATOR.get_kafka_cacert():
conf["ssl.ca.location"] = cacert

return conf


def _get_consumer_config(address, conf_settings): # pragma: no cover
def _get_consumer_config(conf_settings): # pragma: no cover
"""Get the default consumer config"""
conf = {
"bootstrap.servers": address,
"api.version.request": False,
"broker.version.fallback": "0.10.2",
}
Expand All @@ -53,27 +61,27 @@ def _get_consumer_config(address, conf_settings): # pragma: no cover
return conf


def get_consumer(conf_settings, address=Config.INSIGHTS_KAFKA_ADDRESS): # pragma: no cover
def get_consumer(conf_settings): # pragma: no cover
"""Create a Kafka consumer."""
conf = _get_consumer_config(address, conf_settings)
conf = _get_consumer_config(conf_settings)
LOG.info(f"Consumer config {conf}")
return Consumer(conf, logger=LOG)


def _get_producer_config(address, conf_settings): # pragma: no cover
def _get_producer_config(conf_settings): # pragma: no cover
"""Return Kafka Producer config"""
producer_conf = {"bootstrap.servers": address}
producer_conf = {}
producer_conf = _get_managed_kafka_config(producer_conf)
producer_conf.update(conf_settings)

return producer_conf


def get_producer(conf_settings=None, address=Config.INSIGHTS_KAFKA_ADDRESS): # pragma: no cover
def get_producer(conf_settings=None): # pragma: no cover
"""Create a Kafka producer."""
if conf_settings is None:
conf_settings = {}
conf = _get_producer_config(address, conf_settings)
conf = _get_producer_config(conf_settings)
return ProducerSingleton(conf)


Expand All @@ -92,25 +100,28 @@ def backoff(interval, maximum=120):
time.sleep(wait)


def check_kafka_connection(host, port):
def check_kafka_connection():
"""Check connectability of Kafka Broker."""
conn = BrokerConnection(host, int(port), socket.AF_UNSPEC)
connected = conn.connect_blocking(timeout=1)
if connected:
conn.close()
for broker in CONFIGURATOR.get_kafka_broker_list():
host, port = broker.split(":")
conn = BrokerConnection(host, int(port), socket.AF_UNSPEC)
connected = conn.connect_blocking(timeout=1)
if connected:
conn.close()
break
return connected


def is_kafka_connected(host, port):
def is_kafka_connected():
"""Wait for Kafka to become available."""
count = 0
result = False
while not result:
result = check_kafka_connection(host, port)
result = check_kafka_connection()
if result:
LOG.info("Test connection to Kafka was successful.")
LOG.info("test connection to Kafka was successful")
else:
LOG.error(f"Unable to connect to Kafka server: {host}:{port}")
LOG.error("unable to connect to Kafka server")
KAFKA_CONNECTION_ERRORS_COUNTER.inc()
backoff(count)
count += 1
Expand All @@ -129,4 +140,3 @@ def extract_from_header(headers, header_type):
continue
else:
return item.decode("ascii")
return
87 changes: 37 additions & 50 deletions koku/koku/configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

CLOWDER_ENABLED = ENVIRONMENT.bool("CLOWDER_ENABLED", default=False)
if CLOWDER_ENABLED:
from app_common_python import ObjectBuckets, LoadedConfig, KafkaTopics, DependencyEndpoints
from app_common_python import ObjectBuckets, LoadedConfig, KafkaTopics, KafkaServers, DependencyEndpoints


class Configurator:
Expand Down Expand Up @@ -47,17 +47,12 @@ def get_in_memory_db_port():
pass

@staticmethod
def get_kafka_broker_host():
def get_kafka_broker_list():
"""Obtain kafka broker host address."""
pass

@staticmethod
def get_kafka_broker_port():
"""Obtain kafka broker port."""
pass

@staticmethod
def get_kafka_topic(requestedName: str):
def get_kafka_topic(requested_name: str):
"""Obtain kafka topic."""
pass

Expand Down Expand Up @@ -102,22 +97,22 @@ def get_object_store_tls():
pass

@staticmethod
def get_object_store_access_key(requestedName: str = ""):
def get_object_store_access_key(requested_name: str = ""):
"""Obtain object store access key."""
pass

@staticmethod
def get_object_store_secret_key(requestedName: str = ""):
def get_object_store_secret_key(requested_name: str = ""):
"""Obtain object store secret key."""
pass

@staticmethod
def get_object_store_bucket(requestedName: str = ""):
def get_object_store_bucket(requested_name: str = ""):
"""Obtain object store bucket."""
pass

@staticmethod
def get_object_store_region(requestedName: str = ""):
def get_object_store_region(requested_name: str = ""):
"""Obtain object store bucket."""
pass

Expand Down Expand Up @@ -206,19 +201,17 @@ def get_in_memory_db_port():
return ENVIRONMENT.get_value("REDIS_PORT", default="6379")

@staticmethod
def get_kafka_broker_host():
def get_kafka_broker_list():
"""Obtain kafka broker host address."""
return ENVIRONMENT.get_value("INSIGHTS_KAFKA_HOST", default="localhost")
return [
f'{ENVIRONMENT.get_value("INSIGHTS_KAFKA_HOST", default="localhost")}:'
f'{ENVIRONMENT.get_value("INSIGHTS_KAFKA_PORT", default="29092")}'
]

@staticmethod
def get_kafka_broker_port():
"""Obtain kafka broker port."""
return ENVIRONMENT.get_value("INSIGHTS_KAFKA_PORT", default="29092")

@staticmethod
def get_kafka_topic(requestedName: str):
def get_kafka_topic(requested_name: str):
"""Obtain kafka topic."""
return requestedName
return requested_name

@staticmethod
def get_kafka_sasl():
Expand Down Expand Up @@ -282,24 +275,24 @@ def get_object_store_tls():
pass

@staticmethod
def get_object_store_access_key(requestedName: str = ""):
def get_object_store_access_key(requested_name: str = ""):
"""Obtain object store access key."""
return ENVIRONMENT.get_value("S3_ACCESS_KEY", default=None)

@staticmethod
def get_object_store_secret_key(requestedName: str = ""):
def get_object_store_secret_key(requested_name: str = ""):
"""Obtain object store secret key."""
return ENVIRONMENT.get_value("S3_SECRET", default=None)

@staticmethod
def get_object_store_bucket(requestedName: str = ""):
def get_object_store_bucket(requested_name: str = ""):
"""Obtain object store bucket."""
return ENVIRONMENT.get_value("S3_BUCKET_NAME", default=requestedName)
return ENVIRONMENT.get_value("S3_BUCKET_NAME", default=requested_name)

@staticmethod
def get_object_store_region(requestedName: str = ""):
def get_object_store_region(requested_name: str = ""):
"""Obtain object store bucket."""
return ENVIRONMENT.get_value("S3_REGION", default=requestedName)
return ENVIRONMENT.get_value("S3_REGION", default=requested_name)

@staticmethod
def get_database_name():
Expand Down Expand Up @@ -388,22 +381,16 @@ def get_in_memory_db_host():
def get_in_memory_db_port():
"""Obtain in memory (redis) db port."""
return LoadedConfig.inMemoryDb.port
# return ENVIRONMENT.get_value("REDIS_PORT", default="6379")

@staticmethod
def get_kafka_broker_host():
def get_kafka_broker_list():
"""Obtain kafka broker host address."""
return LoadedConfig.kafka.brokers[0].hostname

@staticmethod
def get_kafka_broker_port():
"""Obtain kafka broker port."""
return LoadedConfig.kafka.brokers[0].port
return KafkaServers

@staticmethod
def get_kafka_topic(requestedName: str):
def get_kafka_topic(requested_name: str):
"""Obtain kafka topic."""
return KafkaTopics.get(requestedName).name
return KafkaTopics.get(requested_name).name

@staticmethod
def get_kafka_sasl():
Expand Down Expand Up @@ -477,37 +464,37 @@ def get_object_store_tls():
return False

@staticmethod
def get_object_store_access_key(requestedName: str = ""):
def get_object_store_access_key(requested_name: str = ""):
"""Obtain object store access key."""
if requestedName != "" and ObjectBuckets.get(requestedName):
return ObjectBuckets.get(requestedName).accessKey
if requested_name != "" and ObjectBuckets.get(requested_name):
return ObjectBuckets.get(requested_name).accessKey
if len(LoadedConfig.objectStore.buckets) > 0:
return LoadedConfig.objectStore.buckets[0].accessKey
if LoadedConfig.objectStore.accessKey:
return LoadedConfig.objectStore.accessKey

@staticmethod
def get_object_store_secret_key(requestedName: str = ""):
def get_object_store_secret_key(requested_name: str = ""):
"""Obtain object store secret key."""
if requestedName != "" and ObjectBuckets.get(requestedName):
return ObjectBuckets.get(requestedName).secretKey
if requested_name != "" and ObjectBuckets.get(requested_name):
return ObjectBuckets.get(requested_name).secretKey
if len(LoadedConfig.objectStore.buckets) > 0:
return LoadedConfig.objectStore.buckets[0].secretKey
if LoadedConfig.objectStore.secretKey:
return LoadedConfig.objectStore.secretKey

@staticmethod
def get_object_store_bucket(requestedName: str = ""):
def get_object_store_bucket(requested_name: str = ""):
"""Obtain object store bucket."""
if ObjectBuckets.get(requestedName):
return ObjectBuckets.get(requestedName).name
return requestedName
if ObjectBuckets.get(requested_name):
return ObjectBuckets.get(requested_name).name
return requested_name

@staticmethod
def get_object_store_region(requestedName: str = ""):
def get_object_store_region(requested_name: str = ""):
"""Obtain object store region."""
if ObjectBuckets.get(requestedName):
return ObjectBuckets.get(requestedName).region
if ObjectBuckets.get(requested_name):
return ObjectBuckets.get(requested_name).region
return None

@staticmethod
Expand Down
Loading

0 comments on commit ee9bf8f

Please sign in to comment.