diff --git a/karapace/client.py b/karapace/client.py index 04e166a13..ded1fd685 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -12,11 +12,12 @@ import logging import ssl -log = logging.getLogger(__name__) Path = str Headers = dict JsonData = object # Type of the result after parsing JSON +LOG = logging.getLogger(__name__) + async def _get_aiohttp_client() -> ClientSession: return ClientSession() @@ -74,7 +75,7 @@ async def close(self) -> None: if self._client is not None: await self._client.close() except: # pylint: disable=bare-except - log.info("Could not close client") + LOG.error("Could not close client") async def get_client(self) -> ClientSession: if self._client is None: diff --git a/karapace/compatibility/jsonschema/checks.py b/karapace/compatibility/jsonschema/checks.py index 870d1b251..afc8dcaaf 100644 --- a/karapace/compatibility/jsonschema/checks.py +++ b/karapace/compatibility/jsonschema/checks.py @@ -30,11 +30,8 @@ ) from typing import Any, List, Optional -import logging import networkx as nx -LOG = logging.getLogger(__name__) - INTRODUCED_INCOMPATIBILITY_MSG_FMT = "Introduced incompatible assertion {assert_name} with value {introduced_value}" RESTRICTED_INCOMPATIBILITY_MSG_FMT = "More restrictive assertion {assert_name} from {writer_value} to {reader_value}" MODIFIED_INCOMPATIBILITY_MSG_FMT = "Assertion of {assert_name} changed from {writer_value} to {reader_value}" @@ -248,7 +245,6 @@ def compatibility_rec( # reader has type `array` to represent a list, and the writer is either a # different type or it is also an `array` but now it representes a tuple. if reader_schema is None and writer_schema is not None: - LOG.debug("Schema removed reader_schema.type='%r'", get_type_of(reader_schema)) return incompatible_schema( incompat_type=Incompatibility.schema_removed, message="schema removed", diff --git a/karapace/compatibility/protobuf/checks.py b/karapace/compatibility/protobuf/checks.py index 9b21f02c4..c995d1f03 100644 --- a/karapace/compatibility/protobuf/checks.py +++ b/karapace/compatibility/protobuf/checks.py @@ -2,17 +2,10 @@ from karapace.protobuf.compare_result import CompareResult from karapace.protobuf.schema import ProtobufSchema -import logging - -log = logging.getLogger(__name__) - def check_protobuf_schema_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: result = CompareResult() - log.debug("READER: %s", reader.to_schema()) - log.debug("WRITER: %s", writer.to_schema()) writer.compare(reader, result) - log.debug("IS_COMPATIBLE %s", result.is_compatible()) if result.is_compatible(): return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) diff --git a/karapace/config.py b/karapace/config.py index ab45b66fd..99cf698ce 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -19,7 +19,7 @@ Config = Dict[str, Union[None, str, int, bool, List[str], AccessLogger]] LOG = logging.getLogger(__name__) HOSTNAME = socket.gethostname() - +SASL_PLAIN_PASSWORD = "sasl_plain_password" DEFAULTS = { "access_logs_debug": False, "access_log_class": None, @@ -53,7 +53,7 @@ "ssl_password": None, "sasl_mechanism": None, "sasl_plain_username": None, - "sasl_plain_password": None, + SASL_PLAIN_PASSWORD: None, "topic_name": DEFAULT_SCHEMA_TOPIC, "metadata_max_age_ms": 60000, "admin_metadata_max_age": 5, @@ -67,6 +67,7 @@ "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", } +SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] class InvalidConfiguration(Exception): @@ -109,12 +110,20 @@ def set_settings_from_environment(config: Config) -> None: env_name = config_name_with_prefix.upper() env_val = os.environ.get(env_name) if env_val is not None: - LOG.debug( - "Populating config value %r from env var %r with %r instead of config file", - config_name, - env_name, - env_val, - ) + if config_name not in SECRET_CONFIG_OPTIONS: + LOG.info( + "Populating config value %r from env var %r with %r instead of config file", + config_name, + env_name, + env_val, + ) + else: + LOG.info( + "Populating config value %r from env var %r instead of config file", + config_name, + env_name, + ) + config[config_name] = parse_env_value(env_val) diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 2d8bb57cd..838b6a42c 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -17,7 +17,6 @@ import asyncio import base64 -import logging import time import ujson @@ -39,7 +38,6 @@ def __init__(self, config: Config) -> None: super().__init__(config=config) self._add_kafka_rest_routes() self.serializer = SchemaRegistrySerializer(config=config) - self.log = logging.getLogger("KarapaceRest") self._cluster_metadata = None self._metadata_birth = None self.metadata_max_age = self.config["admin_metadata_max_age"] diff --git a/karapace/kafka_rest_apis/admin.py b/karapace/kafka_rest_apis/admin.py index a0e2d6a16..bf4085bdf 100644 --- a/karapace/kafka_rest_apis/admin.py +++ b/karapace/kafka_rest_apis/admin.py @@ -9,12 +9,10 @@ import logging +LOG = logging.getLogger(__name__) -class KafkaRestAdminClient(KafkaAdminClient): - def __init__(self, **configs): - super().__init__(**configs) - self.log = logging.getLogger("AdminClient") +class KafkaRestAdminClient(KafkaAdminClient): def get_topic_config(self, topic: str) -> dict: config_version = self._matching_api_version(DescribeConfigsRequest) req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic)] @@ -50,7 +48,7 @@ def cluster_metadata(self, topics: List[str] = None, retries: int = 0) -> dict: except Cancelled: if retries > 3: raise - self.log.debug("Retrying metadata with %d retires", retries) + LOG.debug("Retrying metadata with %d retires", retries) return self.cluster_metadata(topics, retries + 1) return self._make_metadata_response(future.value) diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index f7dde4926..e52aad785 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -24,22 +24,21 @@ OFFSET_RESET_STRATEGIES = {"latest", "earliest"} TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"]) +LOG = logging.getLogger(__name__) + + +def new_name() -> str: + return str(uuid.uuid4()) class ConsumerManager: def __init__(self, config: dict) -> None: self.config = config self.hostname = f"http://{self.config['advertised_hostname']}:{self.config['port']}" - self.log = logging.getLogger("RestConsumerManager") self.deserializer = SchemaRegistryDeserializer(config=config) self.consumers = {} self.consumer_locks = defaultdict(Lock) - def new_name(self) -> str: - name = str(uuid.uuid4()) - self.log.debug("Generated new consumer name: %s", name) - return name - @staticmethod def _assert(cond: bool, code: HTTPStatus, sub_code: int, message: str, content_type: str) -> None: if not cond: @@ -155,12 +154,15 @@ def _update_partition_assignments(consumer: KafkaConsumer): # CONSUMER async def create_consumer(self, group_name: str, request_data: dict, content_type: str): group_name = group_name.strip("/") - self.log.info("Create consumer request for group %s", group_name) - consumer_name = request_data.get("name") or self.new_name() + consumer_name = request_data.get("name") or new_name() internal_name = self.create_internal_name(group_name, consumer_name) async with self.consumer_locks[internal_name]: if internal_name in self.consumers: - self.log.error("Error creating duplicate consumer in group %s with id %s", group_name, consumer_name) + LOG.warning( + "Error creating duplicate consumer in group %s with id %s", + group_name, + consumer_name, + ) KarapaceBase.r( status=HTTPStatus.CONFLICT, content_type=content_type, @@ -170,11 +172,14 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ }, ) self._validate_create_consumer(request_data, content_type) - self.log.info( - "Creating new consumer in group %s with id %s and request_info %r", group_name, consumer_name, request_data - ) for k in ["consumer.request.timeout.ms", "fetch_min_bytes"]: convert_to_int(request_data, k, content_type) + LOG.info( + "Creating new consumer in group. group name: %s consumer name: %s request_data %r", + group_name, + consumer_name, + request_data, + ) try: enable_commit = request_data.get("auto.commit.enable", self.config["consumer_enable_auto_commit"]) if isinstance(enable_commit, str): @@ -223,11 +228,11 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name ) return c except: # pylint: disable=bare-except - self.log.exception("Unable to create consumer, retrying") + LOG.exception("Unable to create consumer, retrying") await asyncio.sleep(1) async def delete_consumer(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Deleting consumer for %s", internal_name) + LOG.info("Deleting consumer for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: try: @@ -235,7 +240,7 @@ async def delete_consumer(self, internal_name: Tuple[str, str], content_type: st c.consumer.close() self.consumer_locks.pop(internal_name) except: # pylint: disable=bare-except - self.log.exception("Unable to properly dispose of consumer") + LOG.exception("Unable to properly dispose of consumer") finally: empty_response() @@ -243,7 +248,7 @@ async def delete_consumer(self, internal_name: Tuple[str, str], content_type: st async def commit_offsets( self, internal_name: Tuple[str, str], content_type: str, request_data: dict, cluster_metadata: dict ): - self.log.info("Committing offsets for %s", internal_name) + LOG.info("Committing offsets for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) if request_data: self._assert_has_key(request_data, "offsets", content_type) @@ -266,7 +271,7 @@ async def commit_offsets( empty_response() async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Retrieving offsets for %s", internal_name) + LOG.info("Retrieving offsets for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "partitions", content_type) response = {"offsets": []} @@ -290,7 +295,7 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r # SUBSCRIPTION async def set_subscription(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Updating subscription for %s", internal_name) + LOG.info("Updating subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) topics = request_data.get("topics", []) topics_pattern = request_data.get("topic_pattern") @@ -307,10 +312,10 @@ async def set_subscription(self, internal_name: Tuple[str, str], content_type: s except IllegalStateError as e: self._illegal_state_fail(str(e), content_type=content_type) finally: - self.log.info("Done updating subscription") + LOG.info("Done updating subscription") async def get_subscription(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Retrieving subscription for %s", internal_name) + LOG.info("Retrieving subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -321,7 +326,7 @@ async def get_subscription(self, internal_name: Tuple[str, str], content_type: s KarapaceBase.r(content_type=content_type, body={"topics": topics}) async def delete_subscription(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Deleting subscription for %s", internal_name) + LOG.info("Deleting subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: self.consumers[internal_name].consumer.unsubscribe() @@ -329,7 +334,7 @@ async def delete_subscription(self, internal_name: Tuple[str, str], content_type # ASSIGNMENTS async def set_assignments(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Updating assignments for %s to %r", internal_name, request_data) + LOG.info("Updating assignments for %s to %r", internal_name, request_data) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "partitions", content_type) partitions = [] @@ -346,10 +351,10 @@ async def set_assignments(self, internal_name: Tuple[str, str], content_type: st except IllegalStateError as e: self._illegal_state_fail(message=str(e), content_type=content_type) finally: - self.log.info("Done updating assignment") + LOG.info("Done updating assignment") async def get_assignments(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Retrieving assignment for %s", internal_name) + LOG.info("Retrieving assignment for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -360,7 +365,7 @@ async def get_assignments(self, internal_name: Tuple[str, str], content_type: st # POSITIONS async def seek_to(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Resetting offsets for %s to %r", internal_name, request_data) + LOG.info("Resetting offsets for %s to %r", internal_name, request_data) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "offsets", content_type) seeks = [] @@ -384,7 +389,7 @@ async def seek_limit( self, internal_name: Tuple[str, str], content_type: str, request_data: dict, beginning: bool = True ): direction = "beginning" if beginning else "end" - self.log.info("Seeking %s offsets", direction) + LOG.info("Seeking %s offsets", direction) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "partitions", content_type) resets = [] @@ -406,7 +411,7 @@ async def seek_limit( self._illegal_state_fail(f"Trying to reset unassigned partitions to {direction}", content_type) async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats: dict, query_params: dict): - self.log.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats) + LOG.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -420,7 +425,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats content_type=content_type, message=f"Consumer format {serialization_format} does not match the embedded format {request_format}", ) - self.log.info("Fetch request for %s with params %r", internal_name, query_params) + LOG.info("Fetch request for %s with params %r", internal_name, query_params) try: timeout = ( int(query_params["timeout"]) if "timeout" in query_params else config["consumer.request.timeout.ms"] @@ -438,7 +443,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats if val <= 0: KarapaceBase.internal_error(message=f"Invalid request parameter {val}", content_type=content_type) response = [] - self.log.info( + LOG.info( "Will poll multiple times for a single message with a total timeout of %dms, " "until at least %d bytes have been fetched", timeout, @@ -451,14 +456,14 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats while read_bytes < max_bytes and start_time + timeout / 1000 > time.monotonic(): time_left = start_time + timeout / 1000 - time.monotonic() bytes_left = max_bytes - read_bytes - self.log.info( + LOG.info( "Polling with %r time left and %d bytes left, gathered %d messages so far", time_left, bytes_left, message_count, ) data = consumer.poll(timeout_ms=timeout, max_records=1) - self.log.debug("Successfully polled for messages") + LOG.debug("Successfully polled for messages") for topic, records in data.items(): for rec in records: message_count += 1 @@ -468,7 +473,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats + max(0, rec.serialized_header_size) ) poll_data[topic].append(rec) - self.log.info("Gathered %d total messages", message_count) + LOG.info("Gathered %d total messages", message_count) for tp in poll_data: for msg in poll_data[tp]: try: diff --git a/karapace/karapace.py b/karapace/karapace.py index c2200c55e..2a2560350 100644 --- a/karapace/karapace.py +++ b/karapace/karapace.py @@ -11,8 +11,6 @@ from karapace.rapu import HTTPResponse, RestApp from typing import NoReturn, Union -import logging - class KarapaceBase(RestApp): def __init__(self, config: Config) -> None: @@ -20,9 +18,8 @@ def __init__(self, config: Config) -> None: self.kafka_timeout = 10 self.route("/", callback=self.root_get, method="GET") - self.log = logging.getLogger("Karapace") - self.log.info("Karapace initialized") self.app.on_shutdown.append(self.close_by_app) + self.log.info("Karapace initialized") async def close_by_app(self, app): # pylint: disable=unused-argument diff --git a/karapace/karapace_all.py b/karapace/karapace_all.py index 49c2c1060..7b111e03c 100644 --- a/karapace/karapace_all.py +++ b/karapace/karapace_all.py @@ -1,7 +1,7 @@ from aiohttp.web_log import AccessLogger from contextlib import closing from karapace import version as karapace_version -from karapace.config import Config, read_config +from karapace.config import read_config from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistry @@ -13,9 +13,7 @@ class KarapaceAll(KafkaRest, KarapaceSchemaRegistry): - def __init__(self, config: Config) -> None: - super().__init__(config=config) - self.log = logging.getLogger("KarapaceAll") + pass def main() -> int: diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index c667a69e0..77984201a 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -20,6 +20,7 @@ # SR group errors NO_ERROR = 0 DUPLICATE_URLS = 1 +LOG = logging.getLogger(__name__) def get_identity_url(scheme, host, port): @@ -35,10 +36,6 @@ class SchemaCoordinator(BaseCoordinator): master_url = None master_eligibility = True - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.log = logging.getLogger("SchemaCoordinator") - def protocol_type(self): return "sr" @@ -52,7 +49,7 @@ def group_protocols(self): return [("v0", self.get_identity(host=self.hostname, port=self.port, scheme=self.scheme))] def _perform_assignment(self, leader_id, protocol, members): - self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) + LOG.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) self.are_we_master = None error = NO_ERROR urls = {} @@ -82,7 +79,7 @@ def _perform_assignment(self, leader_id, protocol, members): scheme=member_identity["scheme"], json_encode=False, ) - self.log.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) + LOG.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) assignments = {} for member_id, member_data in members: @@ -94,7 +91,7 @@ def _on_join_prepare(self, generation, member_id): # needs to be implemented in our class for pylint to be satisfied def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): - self.log.info( + LOG.info( "Join complete, generation %r, member_id: %r, protocol: %r, member_assignment_bytes: %r", generation, member_id, @@ -119,13 +116,11 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b else: self.master_url = master_url self.are_we_master = False - # pylint: disable=super-with-arguments - return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) + return super()._on_join_complete(generation, member_id, protocol, member_assignment_bytes) - def _on_join_follower(self): - self.log.info("We are a follower, not a master") - # pylint: disable=super-with-arguments - return super(SchemaCoordinator, self)._on_join_follower() + def _on_join_follower(self) -> None: + LOG.info("We are a follower, not a master") + return super()._on_join_follower() class MasterCoordinator(Thread): @@ -142,7 +137,6 @@ def __init__(self, config): metric_config = MetricConfig(samples=2, time_window_ms=30000, tags=metrics_tags) self._metrics = Metrics(metric_config, reporters=[]) self.schema_coordinator_ready = Event() - self.log = logging.getLogger("MasterCoordinator") def init_kafka_client(self): try: @@ -161,7 +155,7 @@ def init_kafka_client(self): ) return True except (NodeNotReadyError, NoBrokersAvailable): - self.log.warning("No Brokers available yet, retrying init_kafka_client()") + LOG.warning("No Brokers available yet, retrying init_kafka_client()") time.sleep(2.0) return False @@ -187,7 +181,7 @@ def get_master_info(self) -> Tuple[bool, Optional[str]]: return self.sc.are_we_master, self.sc.master_url def close(self): - self.log.info("Closing master_coordinator") + LOG.info("Closing master_coordinator") self.running = False def run(self): @@ -203,10 +197,10 @@ def run(self): self.sc.ensure_active_group() self.sc.poll_heartbeat() - self.log.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) + LOG.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except - self.log.exception("Exception in master_coordinator") + LOG.exception("Exception in master_coordinator") time.sleep(1.0) if self.sc: diff --git a/karapace/protobuf/io.py b/karapace/protobuf/io.py index cf3678a10..ba8d8a223 100644 --- a/karapace/protobuf/io.py +++ b/karapace/protobuf/io.py @@ -10,12 +10,9 @@ import hashlib import importlib import importlib.util -import logging import os import subprocess -logger = logging.getLogger(__name__) - def calculate_class_name(name: str) -> str: return "c_" + hashlib.md5(name.encode("utf-8")).hexdigest() diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index c88f79297..dfb5b265e 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -21,6 +21,8 @@ import time import ujson +LOG = logging.getLogger(__name__) + class BackupError(Exception): """Backup Error""" @@ -31,7 +33,6 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str] self.config = config self.backup_location = backup_path self.topic_name = topic_option or self.config["topic_name"] - self.log = logging.getLogger("SchemaBackup") self.consumer = None self.producer = None self.admin_client = None @@ -88,15 +89,15 @@ def init_admin_client(self): ) break except (NodeNotReadyError, NoBrokersAvailable, AssertionError): - self.log.warning("No Brokers available yet, retrying init_admin_client()") + LOG.warning("No Brokers available yet, retrying init_admin_client()") except: # pylint: disable=bare-except - self.log.exception("Failed to initialize admin client, retrying init_admin_client()") + LOG.exception("Failed to initialize admin client, retrying init_admin_client()") time.sleep(2.0) def _create_schema_topic_if_needed(self): if self.topic_name != self.config["topic_name"]: - self.log.info("Topic name overridden, not creating a topic with schema configuration") + LOG.info("Topic name overridden, not creating a topic with schema configuration") return self.init_admin_client() @@ -109,21 +110,21 @@ def _create_schema_topic_if_needed(self): schema_topic = KafkaSchemaReader.get_new_schema_topic(self.config) try: - self.log.info("Creating schema topic: %r", schema_topic) + LOG.info("Creating schema topic: %r", schema_topic) self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) - self.log.info("Topic: %r created successfully", self.config["topic_name"]) + LOG.info("Topic: %r created successfully", self.config["topic_name"]) break except TopicAlreadyExistsError: - self.log.info("Topic: %r already exists", self.config["topic_name"]) + LOG.info("Topic: %r already exists", self.config["topic_name"]) break except: # pylint: disable=bare-except - self.log.exception( + LOG.exception( "Failed to create topic: %r, retrying _create_schema_topic_if_needed()", self.config["topic_name"] ) time.sleep(5) def close(self): - self.log.info("Closing schema backup reader") + LOG.info("Closing schema backup reader") if self.consumer: self.consumer.close() self.consumer = None @@ -141,10 +142,10 @@ def request_backup(self): if self.backup_location: with open(self.backup_location, mode="w", encoding="utf8") as fp: fp.write(ser) - self.log.info("Schema backup written to %r", self.backup_location) + LOG.info("Schema backup written to %r", self.backup_location) else: print(ser) - self.log.info("Schema backup written to stdout") + LOG.info("Schema backup written to stdout") self.close() def restore_backup(self): @@ -155,7 +156,7 @@ def restore_backup(self): if not self.producer: self.init_producer() - self.log.info("Starting backup restore for topic: %r", self.topic_name) + LOG.info("Starting backup restore for topic: %r", self.topic_name) values = None with open(self.backup_location, mode="r", encoding="utf8") as fp: @@ -171,7 +172,7 @@ def restore_backup(self): future = self.producer.send(self.topic_name, key=key, value=value) self.producer.flush(timeout=self.timeout_ms) msg = future.get(self.timeout_ms) - self.log.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, msg.offset) + LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, msg.offset) self.close() def export_anonymized_avro_schemas(self): @@ -196,16 +197,16 @@ def export_anonymized_avro_schemas(self): if self.backup_location: with open(self.backup_location, mode="w", encoding="utf8") as fp: fp.write(ser) - self.log.info("Anonymized Avro schema export written to %r", self.backup_location) + LOG.info("Anonymized Avro schema export written to %r", self.backup_location) else: print(ser) - self.log.info("Anonymized Avro schema export written to stdout") + LOG.info("Anonymized Avro schema export written to stdout") self.close() def _export(self) -> List[Tuple[str, Dict[str, str]]]: if not self.consumer: self.init_consumer() - self.log.info("Starting schema backup read for topic: %r", self.topic_name) + LOG.info("Starting schema backup read for topic: %r", self.topic_name) values = [] topic_fully_consumed = False @@ -221,14 +222,14 @@ def _export(self) -> List[Tuple[str, Dict[str, str]]]: try: key = ujson.loads(key) except ValueError: - self.log.debug("Invalid JSON in message.key: %r, value: %r", message.key, message.value) + LOG.debug("Invalid JSON in message.key: %r, value: %r", message.key, message.value) value = None if message.value: value = message.value.decode("utf8") try: value = ujson.loads(value) except ValueError: - self.log.debug("Invalid JSON in message.value: %r, key: %r", message.value, message.key) + LOG.debug("Invalid JSON in message.value: %r, key: %r", message.value, message.key) values.append((key, value)) return values diff --git a/karapace/schema_models.py b/karapace/schema_models.py index e8e111440..d36274015 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -16,11 +16,8 @@ from typing import Any, Dict, Union import json -import logging import ujson -log = logging.getLogger(__name__) - def parse_avro_schema_definition(s: str) -> AvroSchema: """Compatibility function with Avro which ignores trailing data in JSON @@ -146,7 +143,6 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": ProtobufException, ProtobufSchemaParseException, ) as e: - log.exception("Unexpected error: %s \n schema:[%s]", e, schema_str) raise InvalidSchema from e else: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 335166560..dab8df057 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -20,7 +20,6 @@ import time import ujson -log = logging.getLogger(__name__) Offset = int Subject = str Version = int @@ -31,6 +30,7 @@ # The value `0` is a valid offset and it represents the first message produced # to a topic, therefore it can not be used. OFFSET_EMPTY = -1 +LOG = logging.getLogger(__name__) class OffsetsWatcher: @@ -83,7 +83,6 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.log = logging.getLogger("KafkaSchemaReader") self.timeout_ms = 200 self.config = config self.subjects: Dict[Subject, SubjectData] = {} @@ -154,10 +153,10 @@ def init_admin_client(self) -> bool: ) return True except (NodeNotReadyError, NoBrokersAvailable, AssertionError): - self.log.warning("No Brokers available yet, retrying init_admin_client()") + LOG.warning("No Brokers available yet, retrying init_admin_client()") time.sleep(2.0) except: # pylint: disable=bare-except - self.log.exception("Failed to initialize admin client, retrying init_admin_client()") + LOG.exception("Failed to initialize admin client, retrying init_admin_client()") time.sleep(2.0) return False @@ -175,17 +174,17 @@ def create_schema_topic(self) -> bool: schema_topic = self.get_new_schema_topic(self.config) try: - self.log.info("Creating topic: %r", schema_topic) + LOG.info("Creating topic: %r", schema_topic) self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) - self.log.info("Topic: %r created successfully", self.config["topic_name"]) + LOG.info("Topic: %r created successfully", self.config["topic_name"]) self.schema_topic = schema_topic return True except TopicAlreadyExistsError: - self.log.warning("Topic: %r already exists", self.config["topic_name"]) + LOG.warning("Topic: %r already exists", self.config["topic_name"]) self.schema_topic = schema_topic return True except: # pylint: disable=bare-except - self.log.exception("Failed to create topic: %r, retrying create_schema_topic()", self.config["topic_name"]) + LOG.exception("Failed to create topic: %r, retrying create_schema_topic()", self.config["topic_name"]) time.sleep(5) return False @@ -198,7 +197,7 @@ def get_schema_id(self, new_schema: TypedSchema) -> int: return self.global_schema_id def close(self) -> None: - self.log.info("Closing schema_reader") + LOG.info("Closing schema_reader") self.running = False def run(self) -> None: @@ -213,10 +212,11 @@ def run(self) -> None: if not self.consumer: self.init_consumer() self.handle_messages() + LOG.info("Status: offset: %r, ready: %r", self.offset, self.ready) except Exception as e: # pylint: disable=broad-except if self.stats: self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - self.log.exception("Unexpected exception in schema reader loop") + LOG.exception("Unexpected exception in schema reader loop") try: if self.admin_client: self.admin_client.close() @@ -225,7 +225,7 @@ def run(self) -> None: except Exception as e: # pylint: disable=broad-except if self.stats: self.stats.unexpected_exception(ex=e, where="schema_reader_exit") - self.log.exception("Unexpected exception closing schema reader") + LOG.exception("Unexpected exception closing schema reader") def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" @@ -248,7 +248,7 @@ def handle_messages(self) -> None: try: key = ujson.loads(msg.key.decode("utf8")) except ValueError: - self.log.exception("Invalid JSON in msg.key: %r, value: %r", msg.key, msg.value) + LOG.exception("Invalid JSON in msg.key") continue value = None @@ -256,18 +256,11 @@ def handle_messages(self) -> None: try: value = ujson.loads(msg.value.decode("utf8")) except ValueError: - self.log.exception("Invalid JSON in msg.value: %r, key: %r", msg.value, msg.key) + LOG.exception("Invalid JSON in msg.value") continue - self.log.info("Read new record: key: %r, value: %r, offset: %r", key, value, msg.offset) self.handle_msg(key, value) self.offset = msg.offset - self.log.info( - "Handled message, current offset: %r, ready: %r, add_offsets: %r", - self.offset, - self.ready, - add_offsets, - ) if self.ready and add_offsets: self.offset_watcher.offset_seen(self.offset) @@ -275,29 +268,29 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: subject = key.get("subject") if subject is not None: if subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", subject) + LOG.info("Adding first version of subject: %r with no schemas", subject) self.subjects[subject] = {"schemas": {}} if not value: - self.log.info("Deleting compatibility config completely for subject: %r", subject) + LOG.info("Deleting compatibility config completely for subject: %r", subject) self.subjects[subject].pop("compatibility", None) else: - self.log.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) + LOG.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) self.subjects[subject]["compatibility"] = value["compatibilityLevel"] elif value is not None: - self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) + LOG.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) self.config["compatibility"] = value["compatibilityLevel"] def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: # pylint: disable=unused-argument if value is None: - self.log.error("DELETE_SUBJECT record doesnt have a value, should have") + LOG.error("DELETE_SUBJECT record doesnt have a value, should have") return subject = value["subject"] if subject not in self.subjects: - self.log.error("Subject: %r did not exist, should have", subject) + LOG.error("Subject: %r did not exist, should have", subject) else: - self.log.info("Deleting subject: %r, value: %r", subject, value) + LOG.info("Deleting subject: %r, value: %r", subject, value) updated_schemas = { key: self._delete_schema_below_version(schema, value["version"]) for key, schema in self.subjects[subject]["schemas"].items() @@ -308,11 +301,11 @@ def _handle_msg_schema_hard_delete(self, key: dict) -> None: subject, version = key["subject"], key["version"] if subject not in self.subjects: - self.log.error("Hard delete: Subject %s did not exist, should have", subject) + LOG.error("Hard delete: Subject %s did not exist, should have", subject) elif version not in self.subjects[subject]["schemas"]: - self.log.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject) + LOG.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject) else: - self.log.info("Hard delete: subject: %r version: %r", subject, version) + LOG.info("Hard delete: subject: %r version: %r", subject, version) self.subjects[subject]["schemas"].pop(version, None) def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: @@ -330,7 +323,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: try: schema_type_parsed = SchemaType(schema_type) except ValueError: - self.log.error("Invalid schema type: %s", schema_type) + LOG.error("Invalid schema type: %s", schema_type) return # Protobuf doesn't use JSON @@ -338,29 +331,33 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: try: ujson.loads(schema_str) except ValueError: - self.log.error("Invalid json: %s", value["schema"]) + LOG.error("Schema is not invalid JSON") return - typed_schema = TypedSchema(schema_type=schema_type_parsed, schema_str=schema_str) - self.log.debug("Got typed schema %r", typed_schema) - if schema_subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", schema_subject) + LOG.info("Adding first version of subject: %r with no schemas", schema_subject) self.subjects[schema_subject] = {"schemas": {}} subjects_schemas = self.subjects[schema_subject]["schemas"] - if schema_version in subjects_schemas: - self.log.info("Updating entry for subject: %r, value: %r", schema_subject, value) - else: - self.log.info("Adding new version of subject: %r, value: %r", schema_subject, value) - - subjects_schemas[schema_version] = { + typed_schema = TypedSchema( + schema_type=schema_type_parsed, + schema_str=schema_str, + ) + schema = { "schema": typed_schema, "version": schema_version, "id": schema_id, "deleted": schema_deleted, } + + if schema_version in subjects_schemas: + LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) + else: + LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) + + subjects_schemas[schema_version] = schema + with self.id_lock: self.schemas[schema_id] = typed_schema self.global_schema_id = max(self.global_schema_id, schema_id) diff --git a/karapace/serialization.py b/karapace/serialization.py index 4e95f5b5c..e983d35c1 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -13,12 +13,9 @@ import avro import avro.schema import io -import logging import struct import ujson -log = logging.getLogger(__name__) - START_BYTE = 0x0 HEADER_FORMAT = ">bI" HEADER_SIZE = 5 diff --git a/karapace/statsd.py b/karapace/statsd.py index 65b85b17d..8255593ee 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -19,13 +19,13 @@ STATSD_HOST = "127.0.0.1" STATSD_PORT = 8125 +LOG = logging.getLogger(__name__) class StatsClient: def __init__(self, host: str = STATSD_HOST, port: int = STATSD_PORT, sentry_config: Dict = None) -> None: self.sentry_config: Dict - self.log = logging.getLogger("StatsClient") if sentry_config is None: self.sentry_config = { "dsn": os.environ.get("SENTRY_DSN"), @@ -73,7 +73,7 @@ def update_sentry_config(self, config: Dict) -> None: self.raven_client = raven.Client(**self.sentry_config) except ImportError: self.raven_client = None - self.log.warning("Cannot enable Sentry.io sending: importing 'raven' failed") + LOG.warning("Cannot enable Sentry.io sending: importing 'raven' failed") else: self.raven_client = None @@ -125,8 +125,8 @@ def _send(self, metric: str, metric_type: bytes, value: Any, tags: Optional[Dict parts.insert(1, ",{}={}".format(tag, tag_value).encode("utf-8")) self._socket.sendto(b"".join(parts), self._dest_addr) - except Exception as ex: # pylint: disable=broad-except - self.log.error("Unexpected exception in statsd send: %s: %s", ex.__class__.__name__, ex) + except Exception: # pylint: disable=broad-except + LOG.exception("Unexpected exception in statsd send") def close(self) -> None: self._socket.close() diff --git a/karapace/utils.py b/karapace/utils.py index 1ed24d50f..c0cd17449 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -20,8 +20,8 @@ import time import ujson -log = logging.getLogger("KarapaceUtils") NS_BLACKOUT_DURATION_SECONDS = 120 +LOG = logging.getLogger(__name__) def _isoformat(datetime_obj: datetime) -> str: @@ -151,7 +151,7 @@ def close_invalid_connections(self): conns = self._conns.copy().values() for conn in conns: if conn and conn.ns_blackout(): - log.info( + LOG.info( "Node id %s no longer in cluster metadata, closing connection and requesting update", conn.node_id ) self.close(conn.node_id) @@ -164,7 +164,7 @@ def _poll(self, timeout): try: self.close_invalid_connections() except Exception as e: # pylint: disable=broad-except - log.error("Error closing invalid connections: %r", e) + LOG.error("Error closing invalid connections: %r", e) def _maybe_refresh_metadata(self, wakeup=False): """ @@ -185,7 +185,7 @@ def _maybe_refresh_metadata(self, wakeup=False): else: node_id = bootstrap_nodes[0].nodeId if node_id is None: - log.debug("Give up sending metadata request since no node is available") + LOG.debug("Give up sending metadata request since no node is available") return self.config["reconnect_backoff_ms"] if self._can_send_request(node_id): @@ -197,7 +197,7 @@ def _maybe_refresh_metadata(self, wakeup=False): topics = [] if self.config["api_version"] < (0, 10) else None api_version = 0 if self.config["api_version"] < (0, 10) else 1 request = MetadataRequest[api_version](topics) - log.debug("Sending metadata request %s to node %s", request, node_id) + LOG.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) future.add_errback(self.cluster.failed_update) @@ -217,7 +217,7 @@ def refresh_done(val_or_error): return self.config["reconnect_backoff_ms"] if self.maybe_connect(node_id, wakeup=wakeup): - log.debug("Initializing connection to node %s for metadata request", node_id) + LOG.debug("Initializing connection to node %s for metadata request", node_id) return self.config["reconnect_backoff_ms"] return float("inf")