From ee9bf8f67a13dac9a0322c51402a194c18bf5f08 Mon Sep 17 00:00:00 2001 From: Michael Skarbek Date: Mon, 20 Nov 2023 16:53:24 -0500 Subject: [PATCH] [COST-4463] MSK: multi-broker support (#4795) --- deploy/clowdapp.yaml | 1 - deploy/kustomize/base/base.yaml | 1 - docs/testing.md | 2 +- koku/api/status/serializers.py | 6 +- koku/kafka_utils/test/test_utils.py | 9 +- koku/kafka_utils/utils.py | 62 +++++++------ koku/koku/configurator.py | 87 ++++++++----------- koku/koku/notifications.py | 4 +- koku/masu/config.py | 15 ---- koku/masu/external/kafka_msg_handler.py | 12 +-- koku/masu/external/ros_report_shipper.py | 3 +- koku/masu/management/commands/listener.py | 3 +- .../test/external/test_kafka_msg_handler.py | 13 +-- koku/sources/api/status.py | 14 +-- koku/sources/config.py | 10 --- koku/sources/kafka_listener.py | 28 +----- koku/sources/kafka_message_processor.py | 4 +- .../management/commands/sources_listener.py | 2 +- koku/sources/test/api/test_status.py | 12 --- koku/sources/test/test_kafka_listener.py | 10 --- koku/subs/subs_data_messenger.py | 4 +- 21 files changed, 107 insertions(+), 195 deletions(-) diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index c360041461..8ceb1c5fa9 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -4206,7 +4206,6 @@ objects: kafkaTopics: - topicName: platform.sources.event-stream - topicName: platform.upload.announce - - topicName: platform.upload.hccm - topicName: platform.upload.validation - topicName: platform.notifications.ingress - topicName: hccm.ros.events diff --git a/deploy/kustomize/base/base.yaml b/deploy/kustomize/base/base.yaml index ee76e16d6d..4433b524d6 100644 --- a/deploy/kustomize/base/base.yaml +++ b/deploy/kustomize/base/base.yaml @@ -31,7 +31,6 @@ objects: kafkaTopics: - topicName: platform.sources.event-stream - topicName: platform.upload.announce - - topicName: platform.upload.hccm - topicName: platform.upload.validation - topicName: platform.notifications.ingress - topicName: hccm.ros.events diff --git a/docs/testing.md b/docs/testing.md index f68a76dff6..c2e52e6a2a 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -238,7 +238,7 @@ If necessary, you can bring up a consumer to see the contents of messages that are uploaded to the `hccm` topic using the following command within the ingress environment: - docker compose exec kafka kafka-console-consumer --topic=platform.upload.hccm --bootstrap-server=localhost:29092 + docker compose exec kafka kafka-console-consumer --topic=platform.upload.announce --bootstrap-server=localhost:29092 Finally, you can bring up Koku project via `docker compose` and check the koku-listener logs to ensure the listener has successfully connected and diff --git a/koku/api/status/serializers.py b/koku/api/status/serializers.py index 480de85937..1623ed0be1 100644 --- a/koku/api/status/serializers.py +++ b/koku/api/status/serializers.py @@ -28,11 +28,7 @@ class ConfigSerializer(serializers.Serializer): initial_ingest_num_months = serializers.IntegerField(source="INITIAL_INGEST_NUM_MONTHS", read_only=True) ingest_override = serializers.BooleanField(source="INGEST_OVERRIDE", read_only=True) trino_enabled = serializers.BooleanField(default=True) - insights_kafka_host = serializers.CharField(source="INSIGHTS_KAFKA_HOST", read_only=True) - insights_kafka_port = serializers.IntegerField(source="INSIGHTS_KAFKA_PORT", read_only=True) - insights_kafka_address = serializers.CharField(source="INSIGHTS_KAFKA_ADDRESS", read_only=True) - hccm_topic = serializers.CharField(source="HCCM_TOPIC", read_only=True) - validation_topic = serializers.CharField(source="VALIDATION_TOPIC", read_only=True) + kafka_connect = serializers.BooleanField(source="KAFKA_CONNECT", read_only=True) retry_seconds = serializers.IntegerField(source="RETRY_SECONDS", read_only=True) del_record_limit = serializers.IntegerField(source="DEL_RECORD_LIMIT", read_only=True) diff --git a/koku/kafka_utils/test/test_utils.py b/koku/kafka_utils/test/test_utils.py index f01548d722..f4918371bd 100644 --- a/koku/kafka_utils/test/test_utils.py +++ b/koku/kafka_utils/test/test_utils.py @@ -9,9 +9,6 @@ from kafka_utils import utils from masu.prometheus_stats import WORKER_REGISTRY -TEST_HOST = "fake-host" -TEST_PORT = "0000" - class KafkaUtilsTest(TestCase): """Test Cases for the Kafka utilities.""" @@ -19,11 +16,11 @@ class KafkaUtilsTest(TestCase): def test_check_kafka_connection(self): """Test check kafka connections.""" with patch("kafka.BrokerConnection.connect_blocking", return_value=False): - result = utils.check_kafka_connection(TEST_HOST, TEST_PORT) + result = utils.check_kafka_connection() self.assertFalse(result) with patch("kafka.BrokerConnection.connect_blocking", return_value=True): with patch("kafka.BrokerConnection.close") as mock_close: - result = utils.check_kafka_connection(TEST_HOST, TEST_PORT) + result = utils.check_kafka_connection() mock_close.assert_called() self.assertTrue(result) @@ -32,7 +29,7 @@ def test_check_kafka_connection(self): def test_kafka_connection_metrics_listen_for_messages(self, mock_start, mock_sleep): """Test check_kafka_connection increments kafka connection errors on KafkaError.""" connection_errors_before = WORKER_REGISTRY.get_sample_value("kafka_connection_errors_total") - utils.is_kafka_connected(TEST_HOST, TEST_PORT) + utils.is_kafka_connected() connection_errors_after = WORKER_REGISTRY.get_sample_value("kafka_connection_errors_total") self.assertEqual(connection_errors_after - connection_errors_before, 1) diff --git a/koku/kafka_utils/utils.py b/koku/kafka_utils/utils.py index 7378ae0feb..cde726519f 100644 --- a/koku/kafka_utils/utils.py +++ b/koku/kafka_utils/utils.py @@ -12,11 +12,18 @@ from confluent_kafka import Producer from kafka import BrokerConnection -from masu.config import Config +from koku.configurator import CONFIGURATOR from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER from masu.util.common import SingletonMeta + LOG = logging.getLogger(__name__) +UPLOAD_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.announce") +VALIDATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.validation") +NOTIFICATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.notifications.ingress") +ROS_TOPIC = CONFIGURATOR.get_kafka_topic("hccm.ros.events") +SUBS_TOPIC = CONFIGURATOR.get_kafka_topic("platform.rhsm-subscriptions.service-instance-ingress") +SOURCES_TOPIC = CONFIGURATOR.get_kafka_topic("platform.sources.event-stream") class ProducerSingleton(Producer, metaclass=SingletonMeta): @@ -28,22 +35,23 @@ def _get_managed_kafka_config(conf=None): # pragma: no cover if not isinstance(conf, dict): conf = {} - if Config.INSIGHTS_KAFKA_SASL: - conf["security.protocol"] = Config.INSIGHTS_KAFKA_SASL.securityProtocol - conf["sasl.mechanisms"] = Config.INSIGHTS_KAFKA_SASL.saslMechanism - conf["sasl.username"] = Config.INSIGHTS_KAFKA_SASL.username - conf["sasl.password"] = Config.INSIGHTS_KAFKA_SASL.password + conf["bootstrap.servers"] = ",".join(CONFIGURATOR.get_kafka_broker_list()) + + if sasl := CONFIGURATOR.get_kafka_sasl(): + conf["security.protocol"] = sasl.securityProtocol + conf["sasl.mechanisms"] = sasl.saslMechanism + conf["sasl.username"] = sasl.username + conf["sasl.password"] = sasl.password - if Config.INSIGHTS_KAFKA_CACERT: - conf["ssl.ca.location"] = Config.INSIGHTS_KAFKA_CACERT + if cacert := CONFIGURATOR.get_kafka_cacert(): + conf["ssl.ca.location"] = cacert return conf -def _get_consumer_config(address, conf_settings): # pragma: no cover +def _get_consumer_config(conf_settings): # pragma: no cover """Get the default consumer config""" conf = { - "bootstrap.servers": address, "api.version.request": False, "broker.version.fallback": "0.10.2", } @@ -53,27 +61,27 @@ def _get_consumer_config(address, conf_settings): # pragma: no cover return conf -def get_consumer(conf_settings, address=Config.INSIGHTS_KAFKA_ADDRESS): # pragma: no cover +def get_consumer(conf_settings): # pragma: no cover """Create a Kafka consumer.""" - conf = _get_consumer_config(address, conf_settings) + conf = _get_consumer_config(conf_settings) LOG.info(f"Consumer config {conf}") return Consumer(conf, logger=LOG) -def _get_producer_config(address, conf_settings): # pragma: no cover +def _get_producer_config(conf_settings): # pragma: no cover """Return Kafka Producer config""" - producer_conf = {"bootstrap.servers": address} + producer_conf = {} producer_conf = _get_managed_kafka_config(producer_conf) producer_conf.update(conf_settings) return producer_conf -def get_producer(conf_settings=None, address=Config.INSIGHTS_KAFKA_ADDRESS): # pragma: no cover +def get_producer(conf_settings=None): # pragma: no cover """Create a Kafka producer.""" if conf_settings is None: conf_settings = {} - conf = _get_producer_config(address, conf_settings) + conf = _get_producer_config(conf_settings) return ProducerSingleton(conf) @@ -92,25 +100,28 @@ def backoff(interval, maximum=120): time.sleep(wait) -def check_kafka_connection(host, port): +def check_kafka_connection(): """Check connectability of Kafka Broker.""" - conn = BrokerConnection(host, int(port), socket.AF_UNSPEC) - connected = conn.connect_blocking(timeout=1) - if connected: - conn.close() + for broker in CONFIGURATOR.get_kafka_broker_list(): + host, port = broker.split(":") + conn = BrokerConnection(host, int(port), socket.AF_UNSPEC) + connected = conn.connect_blocking(timeout=1) + if connected: + conn.close() + break return connected -def is_kafka_connected(host, port): +def is_kafka_connected(): """Wait for Kafka to become available.""" count = 0 result = False while not result: - result = check_kafka_connection(host, port) + result = check_kafka_connection() if result: - LOG.info("Test connection to Kafka was successful.") + LOG.info("test connection to Kafka was successful") else: - LOG.error(f"Unable to connect to Kafka server: {host}:{port}") + LOG.error("unable to connect to Kafka server") KAFKA_CONNECTION_ERRORS_COUNTER.inc() backoff(count) count += 1 @@ -129,4 +140,3 @@ def extract_from_header(headers, header_type): continue else: return item.decode("ascii") - return diff --git a/koku/koku/configurator.py b/koku/koku/configurator.py index 713fa77ee7..f7cd3c770d 100644 --- a/koku/koku/configurator.py +++ b/koku/koku/configurator.py @@ -10,7 +10,7 @@ CLOWDER_ENABLED = ENVIRONMENT.bool("CLOWDER_ENABLED", default=False) if CLOWDER_ENABLED: - from app_common_python import ObjectBuckets, LoadedConfig, KafkaTopics, DependencyEndpoints + from app_common_python import ObjectBuckets, LoadedConfig, KafkaTopics, KafkaServers, DependencyEndpoints class Configurator: @@ -47,17 +47,12 @@ def get_in_memory_db_port(): pass @staticmethod - def get_kafka_broker_host(): + def get_kafka_broker_list(): """Obtain kafka broker host address.""" pass @staticmethod - def get_kafka_broker_port(): - """Obtain kafka broker port.""" - pass - - @staticmethod - def get_kafka_topic(requestedName: str): + def get_kafka_topic(requested_name: str): """Obtain kafka topic.""" pass @@ -102,22 +97,22 @@ def get_object_store_tls(): pass @staticmethod - def get_object_store_access_key(requestedName: str = ""): + def get_object_store_access_key(requested_name: str = ""): """Obtain object store access key.""" pass @staticmethod - def get_object_store_secret_key(requestedName: str = ""): + def get_object_store_secret_key(requested_name: str = ""): """Obtain object store secret key.""" pass @staticmethod - def get_object_store_bucket(requestedName: str = ""): + def get_object_store_bucket(requested_name: str = ""): """Obtain object store bucket.""" pass @staticmethod - def get_object_store_region(requestedName: str = ""): + def get_object_store_region(requested_name: str = ""): """Obtain object store bucket.""" pass @@ -206,19 +201,17 @@ def get_in_memory_db_port(): return ENVIRONMENT.get_value("REDIS_PORT", default="6379") @staticmethod - def get_kafka_broker_host(): + def get_kafka_broker_list(): """Obtain kafka broker host address.""" - return ENVIRONMENT.get_value("INSIGHTS_KAFKA_HOST", default="localhost") + return [ + f'{ENVIRONMENT.get_value("INSIGHTS_KAFKA_HOST", default="localhost")}:' + f'{ENVIRONMENT.get_value("INSIGHTS_KAFKA_PORT", default="29092")}' + ] @staticmethod - def get_kafka_broker_port(): - """Obtain kafka broker port.""" - return ENVIRONMENT.get_value("INSIGHTS_KAFKA_PORT", default="29092") - - @staticmethod - def get_kafka_topic(requestedName: str): + def get_kafka_topic(requested_name: str): """Obtain kafka topic.""" - return requestedName + return requested_name @staticmethod def get_kafka_sasl(): @@ -282,24 +275,24 @@ def get_object_store_tls(): pass @staticmethod - def get_object_store_access_key(requestedName: str = ""): + def get_object_store_access_key(requested_name: str = ""): """Obtain object store access key.""" return ENVIRONMENT.get_value("S3_ACCESS_KEY", default=None) @staticmethod - def get_object_store_secret_key(requestedName: str = ""): + def get_object_store_secret_key(requested_name: str = ""): """Obtain object store secret key.""" return ENVIRONMENT.get_value("S3_SECRET", default=None) @staticmethod - def get_object_store_bucket(requestedName: str = ""): + def get_object_store_bucket(requested_name: str = ""): """Obtain object store bucket.""" - return ENVIRONMENT.get_value("S3_BUCKET_NAME", default=requestedName) + return ENVIRONMENT.get_value("S3_BUCKET_NAME", default=requested_name) @staticmethod - def get_object_store_region(requestedName: str = ""): + def get_object_store_region(requested_name: str = ""): """Obtain object store bucket.""" - return ENVIRONMENT.get_value("S3_REGION", default=requestedName) + return ENVIRONMENT.get_value("S3_REGION", default=requested_name) @staticmethod def get_database_name(): @@ -388,22 +381,16 @@ def get_in_memory_db_host(): def get_in_memory_db_port(): """Obtain in memory (redis) db port.""" return LoadedConfig.inMemoryDb.port - # return ENVIRONMENT.get_value("REDIS_PORT", default="6379") @staticmethod - def get_kafka_broker_host(): + def get_kafka_broker_list(): """Obtain kafka broker host address.""" - return LoadedConfig.kafka.brokers[0].hostname - - @staticmethod - def get_kafka_broker_port(): - """Obtain kafka broker port.""" - return LoadedConfig.kafka.brokers[0].port + return KafkaServers @staticmethod - def get_kafka_topic(requestedName: str): + def get_kafka_topic(requested_name: str): """Obtain kafka topic.""" - return KafkaTopics.get(requestedName).name + return KafkaTopics.get(requested_name).name @staticmethod def get_kafka_sasl(): @@ -477,37 +464,37 @@ def get_object_store_tls(): return False @staticmethod - def get_object_store_access_key(requestedName: str = ""): + def get_object_store_access_key(requested_name: str = ""): """Obtain object store access key.""" - if requestedName != "" and ObjectBuckets.get(requestedName): - return ObjectBuckets.get(requestedName).accessKey + if requested_name != "" and ObjectBuckets.get(requested_name): + return ObjectBuckets.get(requested_name).accessKey if len(LoadedConfig.objectStore.buckets) > 0: return LoadedConfig.objectStore.buckets[0].accessKey if LoadedConfig.objectStore.accessKey: return LoadedConfig.objectStore.accessKey @staticmethod - def get_object_store_secret_key(requestedName: str = ""): + def get_object_store_secret_key(requested_name: str = ""): """Obtain object store secret key.""" - if requestedName != "" and ObjectBuckets.get(requestedName): - return ObjectBuckets.get(requestedName).secretKey + if requested_name != "" and ObjectBuckets.get(requested_name): + return ObjectBuckets.get(requested_name).secretKey if len(LoadedConfig.objectStore.buckets) > 0: return LoadedConfig.objectStore.buckets[0].secretKey if LoadedConfig.objectStore.secretKey: return LoadedConfig.objectStore.secretKey @staticmethod - def get_object_store_bucket(requestedName: str = ""): + def get_object_store_bucket(requested_name: str = ""): """Obtain object store bucket.""" - if ObjectBuckets.get(requestedName): - return ObjectBuckets.get(requestedName).name - return requestedName + if ObjectBuckets.get(requested_name): + return ObjectBuckets.get(requested_name).name + return requested_name @staticmethod - def get_object_store_region(requestedName: str = ""): + def get_object_store_region(requested_name: str = ""): """Obtain object store region.""" - if ObjectBuckets.get(requestedName): - return ObjectBuckets.get(requestedName).region + if ObjectBuckets.get(requested_name): + return ObjectBuckets.get(requested_name).region return None @staticmethod diff --git a/koku/koku/notifications.py b/koku/koku/notifications.py index e6cc6d42fe..0f7695fd7b 100644 --- a/koku/koku/notifications.py +++ b/koku/koku/notifications.py @@ -11,7 +11,7 @@ from api.provider.models import Provider from kafka_utils.utils import delivery_callback from kafka_utils.utils import get_producer -from masu.config import Config +from kafka_utils.utils import NOTIFICATION_TOPIC from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER LOG = logging.getLogger(__name__) @@ -93,7 +93,7 @@ def send_notification(self, msg): None """ producer = get_producer() - producer.produce(Config.NOTIFICATION_TOPIC, value=msg, callback=delivery_callback) + producer.produce(NOTIFICATION_TOPIC, value=msg, callback=delivery_callback) producer.poll(0) def cost_model_notification(self, provider: Provider): diff --git a/koku/masu/config.py b/koku/masu/config.py index e05cddd1d9..7edd3325a8 100644 --- a/koku/masu/config.py +++ b/koku/masu/config.py @@ -7,7 +7,6 @@ from django.conf import settings -from koku.configurator import CONFIGURATOR from koku.env import ENVIRONMENT @@ -82,20 +81,6 @@ class Config: # Set ROS presigned URL expiration: ROS_URL_EXPIRATION = ENVIRONMENT.int("ROS_URL_EXPIRATION", default=DEFAULT_ROS_URL_EXPIRATION) - # Insights Kafka - INSIGHTS_KAFKA_HOST = CONFIGURATOR.get_kafka_broker_host() - INSIGHTS_KAFKA_PORT = CONFIGURATOR.get_kafka_broker_port() - INSIGHTS_KAFKA_ADDRESS = f"{INSIGHTS_KAFKA_HOST}:{INSIGHTS_KAFKA_PORT}" - INSIGHTS_KAFKA_SASL = CONFIGURATOR.get_kafka_sasl() - INSIGHTS_KAFKA_CACERT = CONFIGURATOR.get_kafka_cacert() - INSIGHTS_KAFKA_AUTHTYPE = CONFIGURATOR.get_kafka_authtype() - HCCM_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.hccm") - UPLOAD_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.announce") - VALIDATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.validation") - NOTIFICATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.notifications.ingress") - ROS_TOPIC = CONFIGURATOR.get_kafka_topic("hccm.ros.events") - SUBS_TOPIC = CONFIGURATOR.get_kafka_topic("platform.rhsm-subscriptions.service-instance-ingress") - # Flag to signal whether or not to connect to upload service KAFKA_CONNECT = ENVIRONMENT.bool("KAFKA_CONNECT", default=DEFAULT_KAFKA_CONNECT) diff --git a/koku/masu/external/kafka_msg_handler.py b/koku/masu/external/kafka_msg_handler.py index 88bcc89401..d411ed0e0d 100644 --- a/koku/masu/external/kafka_msg_handler.py +++ b/koku/masu/external/kafka_msg_handler.py @@ -31,6 +31,8 @@ from kafka_utils.utils import get_consumer from kafka_utils.utils import get_producer from kafka_utils.utils import is_kafka_connected +from kafka_utils.utils import UPLOAD_TOPIC +from kafka_utils.utils import VALIDATION_TOPIC from masu.config import Config from masu.database.report_manifest_db_accessor import ReportManifestDBAccessor from masu.external import UNCOMPRESSED @@ -362,7 +364,7 @@ def send_confirmation(request_id, status): # pragma: no cover producer = get_producer() validation = {"request_id": request_id, "validation": status} msg = bytes(json.dumps(validation), "utf-8") - producer.produce(Config.VALIDATION_TOPIC, value=msg, callback=delivery_callback) + producer.produce(VALIDATION_TOPIC, value=msg, callback=delivery_callback) producer.poll(0) @@ -370,7 +372,7 @@ def handle_message(kmsg): """ Handle messages from message pending queue. - Handle's messages with topics: 'platform.upload.hccm', + Handle's messages with topics: 'platform.upload.announce', and 'platform.upload.available'. The OCP cost usage payload will land on topic hccm. @@ -644,7 +646,7 @@ def listen_for_messages_loop(): "max.poll.interval.ms": 1080000, # 18 minutes } consumer = get_consumer(kafka_conf) - consumer.subscribe([Config.UPLOAD_TOPIC]) + consumer.subscribe([UPLOAD_TOPIC]) LOG.info("Consumer is listening for messages...") for _ in itertools.count(): # equivalent to while True, but mockable msg = consumer.poll(timeout=1.0) @@ -696,7 +698,7 @@ def listen_for_messages(msg, consumer): """ offset = msg.offset() partition = msg.partition() - topic_partition = TopicPartition(topic=Config.UPLOAD_TOPIC, partition=partition, offset=offset) + topic_partition = TopicPartition(topic=UPLOAD_TOPIC, partition=partition, offset=offset) try: LOG.info(f"Processing message offset: {offset} partition: {partition}") service = extract_from_header(msg.headers(), "service") @@ -728,7 +730,7 @@ def koku_listener_thread(): # pragma: no cover None """ - if is_kafka_connected(Config.INSIGHTS_KAFKA_HOST, Config.INSIGHTS_KAFKA_PORT): # Check that Kafka is running + if is_kafka_connected(): # Check that Kafka is running LOG.info("Kafka is running.") try: diff --git a/koku/masu/external/ros_report_shipper.py b/koku/masu/external/ros_report_shipper.py index acbc46135e..9cad33c2d9 100644 --- a/koku/masu/external/ros_report_shipper.py +++ b/koku/masu/external/ros_report_shipper.py @@ -16,6 +16,7 @@ from api.utils import DateHelper from kafka_utils.utils import delivery_callback from kafka_utils.utils import get_producer +from kafka_utils.utils import ROS_TOPIC from koku.feature_flags import UNLEASH_CLIENT from masu.config import Config as masu_config from masu.external.downloader.ocp.ocp_report_downloader import OPERATOR_VERSIONS @@ -138,7 +139,7 @@ def copy_data_to_ros_s3_bucket(self, filename, data): def send_kafka_message(self, msg): """Sends a kafka message to the ROS topic with the S3 keys for the uploaded reports.""" producer = get_producer() - producer.produce(masu_config.ROS_TOPIC, value=msg, callback=delivery_callback) + producer.produce(ROS_TOPIC, value=msg, callback=delivery_callback) producer.poll(0) def build_ros_msg(self, presigned_urls, upload_keys): diff --git a/koku/masu/management/commands/listener.py b/koku/masu/management/commands/listener.py index d8ffe6aafa..bf69685d0e 100644 --- a/koku/masu/management/commands/listener.py +++ b/koku/masu/management/commands/listener.py @@ -14,7 +14,6 @@ from koku.probe_server import ProbeResponse from koku.probe_server import ProbeServer from koku.probe_server import start_probe_server -from masu.config import Config from masu.external.kafka_msg_handler import initialize_kafka_handler @@ -29,7 +28,7 @@ def readiness_check(self): status = 424 msg = "not ready" if self.ready: - if not check_kafka_connection(Config.INSIGHTS_KAFKA_HOST, Config.INSIGHTS_KAFKA_PORT): + if not check_kafka_connection(): response = ProbeResponse(status, "kafka connection error") self._write_response(response) self.logger.info(response.json) diff --git a/koku/masu/test/external/test_kafka_msg_handler.py b/koku/masu/test/external/test_kafka_msg_handler.py index f7a1614dd4..742fd2b30d 100644 --- a/koku/masu/test/external/test_kafka_msg_handler.py +++ b/koku/masu/test/external/test_kafka_msg_handler.py @@ -21,6 +21,7 @@ from requests.exceptions import HTTPError import masu.external.kafka_msg_handler as msg_handler +from kafka_utils.utils import UPLOAD_TOPIC from masu.config import Config from masu.external.downloader.ocp.ocp_report_downloader import OCPReportDownloader from masu.external.kafka_msg_handler import KafkaMsgHandlerError @@ -62,7 +63,7 @@ class MockMessage: def __init__( self, - topic=Config.UPLOAD_TOPIC, + topic=UPLOAD_TOPIC, url="http://unreal", value_dict={}, offset=50, @@ -199,7 +200,7 @@ def test_listen_for_messages(self, mock_process_message): ] for test in test_matrix: msg = MockMessage( - topic="platform.upload.hccm", + topic="platform.upload.announce", offset=5, url="https://insights-quarantine.s3.amazonaws.com/myfile", value_dict=test.get("test_value"), @@ -240,7 +241,7 @@ def test_listen_for_messages_db_error(self, mock_process_message): ] for test in test_matrix: msg = MockMessage( - topic="platform.upload.hccm", + topic="platform.upload.announce", offset=5, url="https://insights-quarantine.s3.amazonaws.com/myfile", value_dict=test.get("test_value"), @@ -279,7 +280,7 @@ def test_listen_for_messages_error(self, mock_process_message): ] for test in test_matrix: msg = MockMessage( - topic="platform.upload.hccm", + topic="platform.upload.announce", offset=5, url="https://insights-quarantine.s3.amazonaws.com/myfile", value_dict=test.get("test_value"), @@ -397,7 +398,7 @@ def _expect_report_mock_not_called(msg, test, process_report_mock): for test in test_matrix: with self.subTest(test=test): msg = MockMessage( - topic="platform.upload.hccm", + topic="platform.upload.announce", offset=5, url="https://insights-quarantine.s3.amazonaws.com/myfile", value_dict=test.get("test_value"), @@ -418,7 +419,7 @@ def _expect_report_mock_not_called(msg, test, process_report_mock): @patch("masu.external.kafka_msg_handler.close_and_set_db_connection") def test_handle_messages(self, _): """Test to ensure that kafka messages are handled.""" - hccm_msg = MockMessage(Config.UPLOAD_TOPIC, "http://insights-upload.com/quarnantine/file_to_validate") + hccm_msg = MockMessage(UPLOAD_TOPIC, "http://insights-upload.com/quarnantine/file_to_validate") # Verify that when extract_payload is successful with 'hccm' message that SUCCESS_CONFIRM_STATUS is returned with patch("masu.external.kafka_msg_handler.extract_payload", return_value=(None, None)): diff --git a/koku/sources/api/status.py b/koku/sources/api/status.py index 973909cfe2..2d7cd2868b 100644 --- a/koku/sources/api/status.py +++ b/koku/sources/api/status.py @@ -5,7 +5,6 @@ """View for server status.""" import logging import platform -import socket import sys from http import HTTPStatus @@ -15,7 +14,6 @@ from django.db import NotSupportedError from django.db import OperationalError from django.db import ProgrammingError -from kafka import BrokerConnection from rest_framework.decorators import api_view from rest_framework.decorators import permission_classes from rest_framework.decorators import renderer_classes @@ -23,6 +21,7 @@ from rest_framework.response import Response from rest_framework.settings import api_settings +from kafka_utils.utils import check_kafka_connection from masu.config import Config from masu.external.date_accessor import DateAccessor from sources.config import Config as SourcesConfig @@ -33,21 +32,10 @@ LOG = logging.getLogger(__name__) -BROKER_CONNECTION = BrokerConnection( - SourcesConfig.SOURCES_KAFKA_HOST, int(SourcesConfig.SOURCES_KAFKA_PORT), socket.AF_UNSPEC -) BROKER_CONNECTION_ERROR = "Unable to establish connection with broker." CELERY_WORKER_NOT_FOUND = "No running Celery workers were found." -def check_kafka_connection(): - """Check connectability of Kafka Broker.""" - connected = BROKER_CONNECTION.connect_blocking(timeout=1) - if connected: - BROKER_CONNECTION.close() - return connected - - def check_sources_connection(): """Check sources-backend connection.""" try: diff --git a/koku/sources/config.py b/koku/sources/config.py index 57619d0399..10d86a907e 100644 --- a/koku/sources/config.py +++ b/koku/sources/config.py @@ -10,16 +10,6 @@ class Config: """Configuration for service.""" - # SOURCES_TOPIC = ENVIRONMENT.get_value("SOURCES_KAFKA_TOPIC", default="platform.sources.event-stream") - SOURCES_TOPIC = CONFIGURATOR.get_kafka_topic("platform.sources.event-stream") - - SOURCES_KAFKA_HOST = CONFIGURATOR.get_kafka_broker_host() - SOURCES_KAFKA_PORT = CONFIGURATOR.get_kafka_broker_port() - SOURCES_KAFKA_ADDRESS = f"{SOURCES_KAFKA_HOST}:{SOURCES_KAFKA_PORT}" - SOURCES_KAFKA_SASL = CONFIGURATOR.get_kafka_sasl() - SOURCES_KAFKA_CACERT = CONFIGURATOR.get_kafka_cacert() - SOURCES_KAFKA_AUTHTYPE = CONFIGURATOR.get_kafka_authtype() - SOURCES_API_HOST = CONFIGURATOR.get_endpoint_host("sources-api", "svc", "localhost") SOURCES_API_PORT = CONFIGURATOR.get_endpoint_port("sources-api", "svc", "3000") SOURCES_API_URL = f"http://{SOURCES_API_HOST}:{SOURCES_API_PORT}" diff --git a/koku/sources/kafka_listener.py b/koku/sources/kafka_listener.py index a0d198c30f..0891f05dc2 100644 --- a/koku/sources/kafka_listener.py +++ b/koku/sources/kafka_listener.py @@ -25,13 +25,14 @@ from api.provider.models import Sources from kafka_utils.utils import get_consumer +from kafka_utils.utils import is_kafka_connected +from kafka_utils.utils import SOURCES_TOPIC from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER from masu.prometheus_stats import SOURCES_HTTP_CLIENT_ERROR_COUNTER from masu.prometheus_stats import SOURCES_KAFKA_LOOP_RETRY from masu.prometheus_stats import SOURCES_PROVIDER_OP_RETRY_LOOP_COUNTER from providers.provider_errors import SkipStatusPush from sources import storage -from sources.api.status import check_kafka_connection from sources.config import Config from sources.kafka_message_processor import create_msg_processor from sources.kafka_message_processor import SourcesMessageError @@ -247,7 +248,7 @@ def listen_for_messages_loop(application_source_id): # pragma: no cover "enable.auto.commit": False, } consumer = get_consumer(kafka_conf) - consumer.subscribe([Config.SOURCES_TOPIC]) + consumer.subscribe([SOURCES_TOPIC]) LOG.info("Listener started. Waiting for messages...") while True: msg_list = consumer.consume() @@ -278,7 +279,7 @@ def listen_for_messages(kaf_msg, consumer, application_source_id): # noqa: C901 try: msg_processor = create_msg_processor(kaf_msg, application_source_id) if msg_processor and msg_processor.source_id and msg_processor.auth_header: - tp = TopicPartition(Config.SOURCES_TOPIC, msg_processor.partition, msg_processor.offset) + tp = TopicPartition(SOURCES_TOPIC, msg_processor.partition, msg_processor.offset) if not msg_processor.msg_for_cost_mgmt(): LOG.info("Event not associated with cost-management.") consumer.commit() @@ -331,27 +332,6 @@ def backoff(interval, maximum=120): time.sleep(wait) -def is_kafka_connected(): # pragma: no cover - """ - Check connectability to Kafka messenger. - - This method will block sources integration initialization until - Kafka is connected. - """ - count = 0 - result = False - while not result: - result = check_kafka_connection() - if result: - LOG.info("Test connection to Kafka was successful.") - else: - LOG.error(f"Unable to connect to Kafka server [{Config.SOURCES_KAFKA_HOST}:{Config.SOURCES_KAFKA_PORT}].") - KAFKA_CONNECTION_ERRORS_COUNTER.inc() - backoff(count) - count += 1 - return result - - @KAFKA_CONNECTION_ERRORS_COUNTER.count_exceptions() def sources_integration_thread(): # pragma: no cover """ diff --git a/koku/sources/kafka_message_processor.py b/koku/sources/kafka_message_processor.py index 12d9e90f60..173b9557b7 100644 --- a/koku/sources/kafka_message_processor.py +++ b/koku/sources/kafka_message_processor.py @@ -9,8 +9,8 @@ from api.provider.models import Provider from kafka_utils.utils import extract_from_header +from kafka_utils.utils import SOURCES_TOPIC from sources import storage -from sources.config import Config from sources.sources_http_client import AUTH_TYPES from sources.sources_http_client import convert_header_to_dict from sources.sources_http_client import SourceNotFoundError @@ -344,7 +344,7 @@ def process(self): def create_msg_processor(msg, cost_mgmt_id): """Create the message processor based on the event_type.""" - if msg.topic() == Config.SOURCES_TOPIC: + if msg.topic() == SOURCES_TOPIC: event_type = extract_from_header(msg.headers(), KAFKA_HDR_EVENT_TYPE) LOG.debug(f"event_type: {event_type}") if event_type in ( diff --git a/koku/sources/management/commands/sources_listener.py b/koku/sources/management/commands/sources_listener.py index be210d308b..1f64329d7a 100644 --- a/koku/sources/management/commands/sources_listener.py +++ b/koku/sources/management/commands/sources_listener.py @@ -8,12 +8,12 @@ from django.core.management.base import BaseCommand +from kafka_utils.utils import check_kafka_connection from koku.database import check_migrations from koku.feature_flags import UNLEASH_CLIENT from koku.probe_server import ProbeResponse from koku.probe_server import ProbeServer from koku.probe_server import start_probe_server -from sources.api.status import check_kafka_connection from sources.api.status import check_sources_connection from sources.kafka_listener import initialize_sources_integration diff --git a/koku/sources/test/api/test_status.py b/koku/sources/test/api/test_status.py index 7f2ba6b219..52dcaa8669 100644 --- a/koku/sources/test/api/test_status.py +++ b/koku/sources/test/api/test_status.py @@ -19,7 +19,6 @@ from django.urls import reverse from sources.api.status import ApplicationStatus -from sources.api.status import check_kafka_connection from sources.api.status import check_sources_connection from sources.sources_http_client import SourceNotFoundError from sources.sources_http_client import SourcesHTTPClientError @@ -71,17 +70,6 @@ def test_status_readiness_dependency_error(self): response = self.client.get(reverse("server-status")) self.assertEqual(response.status_code, HTTPStatus.FAILED_DEPENDENCY) - def test_check_kafka_connection(self): - """Test check kafka connections.""" - with patch("kafka.BrokerConnection.connect_blocking", return_value=False): - result = check_kafka_connection() - self.assertFalse(result) - with patch("kafka.BrokerConnection.connect_blocking", return_value=True): - with patch("kafka.BrokerConnection.close") as mock_close: - result = check_kafka_connection() - mock_close.assert_called() - self.assertTrue(result) - def test_check_sources_connection(self): """Test check sources connections.""" app_id = 2 diff --git a/koku/sources/test/test_kafka_listener.py b/koku/sources/test/test_kafka_listener.py index 2f45ca8cb2..cda61786d4 100644 --- a/koku/sources/test/test_kafka_listener.py +++ b/koku/sources/test/test_kafka_listener.py @@ -26,7 +26,6 @@ from api.provider.provider_builder import ProviderBuilder from api.provider.provider_builder import ProviderBuilderError from koku.middleware import IdentityHeaderMiddleware -from masu.prometheus_stats import WORKER_REGISTRY from providers.provider_access import ProviderAccessor from providers.provider_errors import SkipStatusPush from sources import storage @@ -855,15 +854,6 @@ def test_collect_pending_items(self): response = source_integration._collect_pending_items() self.assertEqual(len(response), 3) - @patch("time.sleep", side_effect=None) - @patch("sources.kafka_listener.check_kafka_connection", side_effect=[bool(0), bool(1)]) - def test_kafka_connection_metrics_listen_for_messages(self, mock_start, mock_sleep): - """Test check_kafka_connection increments kafka connection errors on KafkaError.""" - connection_errors_before = WORKER_REGISTRY.get_sample_value("kafka_connection_errors_total") - source_integration.is_kafka_connected() - connection_errors_after = WORKER_REGISTRY.get_sample_value("kafka_connection_errors_total") - self.assertEqual(connection_errors_after - connection_errors_before, 1) - # @patch.object(Config, "SOURCES_API_URL", "http://www.sources.com") # def test_process_message_application_unsupported_source_type(self): # """Test the process_message function with an unsupported source type.""" diff --git a/koku/subs/subs_data_messenger.py b/koku/subs/subs_data_messenger.py index eff0fee1c9..d7213d78f0 100644 --- a/koku/subs/subs_data_messenger.py +++ b/koku/subs/subs_data_messenger.py @@ -15,7 +15,7 @@ from api.iam.models import Customer from kafka_utils.utils import delivery_callback from kafka_utils.utils import get_producer -from masu.config import Config as masu_config +from kafka_utils.utils import SUBS_TOPIC from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER from masu.util.aws.common import get_s3_resource @@ -80,7 +80,7 @@ def process_and_send_subs_message(self, upload_keys): def send_kafka_message(self, msg): """Sends a kafka message to the SUBS topic with the S3 keys for the uploaded reports.""" producer = get_producer() - producer.produce(masu_config.SUBS_TOPIC, value=msg, callback=delivery_callback) + producer.produce(SUBS_TOPIC, value=msg, callback=delivery_callback) producer.poll(0) def build_subs_msg(