Skip to content

Commit

Permalink
Merge pull request #384 from aiven/hacka-reduced-logging
Browse files Browse the repository at this point in the history
Reduced logging
  • Loading branch information
tvainika authored Apr 20, 2022
2 parents 67b848d + dd15a56 commit 27f10b9
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 166 deletions.
5 changes: 3 additions & 2 deletions karapace/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import logging
import ssl

log = logging.getLogger(__name__)
Path = str
Headers = dict
JsonData = object # Type of the result after parsing JSON

LOG = logging.getLogger(__name__)


async def _get_aiohttp_client() -> ClientSession:
return ClientSession()
Expand Down Expand Up @@ -74,7 +75,7 @@ async def close(self) -> None:
if self._client is not None:
await self._client.close()
except: # pylint: disable=bare-except
log.info("Could not close client")
LOG.error("Could not close client")

async def get_client(self) -> ClientSession:
if self._client is None:
Expand Down
4 changes: 0 additions & 4 deletions karapace/compatibility/jsonschema/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@
)
from typing import Any, List, Optional

import logging
import networkx as nx

LOG = logging.getLogger(__name__)

INTRODUCED_INCOMPATIBILITY_MSG_FMT = "Introduced incompatible assertion {assert_name} with value {introduced_value}"
RESTRICTED_INCOMPATIBILITY_MSG_FMT = "More restrictive assertion {assert_name} from {writer_value} to {reader_value}"
MODIFIED_INCOMPATIBILITY_MSG_FMT = "Assertion of {assert_name} changed from {writer_value} to {reader_value}"
Expand Down Expand Up @@ -248,7 +245,6 @@ def compatibility_rec(
# reader has type `array` to represent a list, and the writer is either a
# different type or it is also an `array` but now it representes a tuple.
if reader_schema is None and writer_schema is not None:
LOG.debug("Schema removed reader_schema.type='%r'", get_type_of(reader_schema))
return incompatible_schema(
incompat_type=Incompatibility.schema_removed,
message="schema removed",
Expand Down
7 changes: 0 additions & 7 deletions karapace/compatibility/protobuf/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@
from karapace.protobuf.compare_result import CompareResult
from karapace.protobuf.schema import ProtobufSchema

import logging

log = logging.getLogger(__name__)


def check_protobuf_schema_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
result = CompareResult()
log.debug("READER: %s", reader.to_schema())
log.debug("WRITER: %s", writer.to_schema())
writer.compare(reader, result)
log.debug("IS_COMPATIBLE %s", result.is_compatible())
if result.is_compatible():
return SchemaCompatibilityResult(SchemaCompatibilityType.compatible)

Expand Down
25 changes: 17 additions & 8 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
Config = Dict[str, Union[None, str, int, bool, List[str], AccessLogger]]
LOG = logging.getLogger(__name__)
HOSTNAME = socket.gethostname()

SASL_PLAIN_PASSWORD = "sasl_plain_password"
DEFAULTS = {
"access_logs_debug": False,
"access_log_class": None,
Expand Down Expand Up @@ -53,7 +53,7 @@
"ssl_password": None,
"sasl_mechanism": None,
"sasl_plain_username": None,
"sasl_plain_password": None,
SASL_PLAIN_PASSWORD: None,
"topic_name": DEFAULT_SCHEMA_TOPIC,
"metadata_max_age_ms": 60000,
"admin_metadata_max_age": 5,
Expand All @@ -67,6 +67,7 @@
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]


class InvalidConfiguration(Exception):
Expand Down Expand Up @@ -109,12 +110,20 @@ def set_settings_from_environment(config: Config) -> None:
env_name = config_name_with_prefix.upper()
env_val = os.environ.get(env_name)
if env_val is not None:
LOG.debug(
"Populating config value %r from env var %r with %r instead of config file",
config_name,
env_name,
env_val,
)
if config_name not in SECRET_CONFIG_OPTIONS:
LOG.info(
"Populating config value %r from env var %r with %r instead of config file",
config_name,
env_name,
env_val,
)
else:
LOG.info(
"Populating config value %r from env var %r instead of config file",
config_name,
env_name,
)

config[config_name] = parse_env_value(env_val)


Expand Down
2 changes: 0 additions & 2 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import asyncio
import base64
import logging
import time
import ujson

Expand All @@ -39,7 +38,6 @@ def __init__(self, config: Config) -> None:
super().__init__(config=config)
self._add_kafka_rest_routes()
self.serializer = SchemaRegistrySerializer(config=config)
self.log = logging.getLogger("KarapaceRest")
self._cluster_metadata = None
self._metadata_birth = None
self.metadata_max_age = self.config["admin_metadata_max_age"]
Expand Down
8 changes: 3 additions & 5 deletions karapace/kafka_rest_apis/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@

import logging

LOG = logging.getLogger(__name__)

class KafkaRestAdminClient(KafkaAdminClient):
def __init__(self, **configs):
super().__init__(**configs)
self.log = logging.getLogger("AdminClient")

class KafkaRestAdminClient(KafkaAdminClient):
def get_topic_config(self, topic: str) -> dict:
config_version = self._matching_api_version(DescribeConfigsRequest)
req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic)]
Expand Down Expand Up @@ -50,7 +48,7 @@ def cluster_metadata(self, topics: List[str] = None, retries: int = 0) -> dict:
except Cancelled:
if retries > 3:
raise
self.log.debug("Retrying metadata with %d retires", retries)
LOG.debug("Retrying metadata with %d retires", retries)
return self.cluster_metadata(topics, retries + 1)
return self._make_metadata_response(future.value)

Expand Down
69 changes: 37 additions & 32 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@
OFFSET_RESET_STRATEGIES = {"latest", "earliest"}

TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"])
LOG = logging.getLogger(__name__)


def new_name() -> str:
return str(uuid.uuid4())


class ConsumerManager:
def __init__(self, config: dict) -> None:
self.config = config
self.hostname = f"http://{self.config['advertised_hostname']}:{self.config['port']}"
self.log = logging.getLogger("RestConsumerManager")
self.deserializer = SchemaRegistryDeserializer(config=config)
self.consumers = {}
self.consumer_locks = defaultdict(Lock)

def new_name(self) -> str:
name = str(uuid.uuid4())
self.log.debug("Generated new consumer name: %s", name)
return name

@staticmethod
def _assert(cond: bool, code: HTTPStatus, sub_code: int, message: str, content_type: str) -> None:
if not cond:
Expand Down Expand Up @@ -155,12 +154,15 @@ def _update_partition_assignments(consumer: KafkaConsumer):
# CONSUMER
async def create_consumer(self, group_name: str, request_data: dict, content_type: str):
group_name = group_name.strip("/")
self.log.info("Create consumer request for group %s", group_name)
consumer_name = request_data.get("name") or self.new_name()
consumer_name = request_data.get("name") or new_name()
internal_name = self.create_internal_name(group_name, consumer_name)
async with self.consumer_locks[internal_name]:
if internal_name in self.consumers:
self.log.error("Error creating duplicate consumer in group %s with id %s", group_name, consumer_name)
LOG.warning(
"Error creating duplicate consumer in group %s with id %s",
group_name,
consumer_name,
)
KarapaceBase.r(
status=HTTPStatus.CONFLICT,
content_type=content_type,
Expand All @@ -170,11 +172,14 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ
},
)
self._validate_create_consumer(request_data, content_type)
self.log.info(
"Creating new consumer in group %s with id %s and request_info %r", group_name, consumer_name, request_data
)
for k in ["consumer.request.timeout.ms", "fetch_min_bytes"]:
convert_to_int(request_data, k, content_type)
LOG.info(
"Creating new consumer in group. group name: %s consumer name: %s request_data %r",
group_name,
consumer_name,
request_data,
)
try:
enable_commit = request_data.get("auto.commit.enable", self.config["consumer_enable_auto_commit"])
if isinstance(enable_commit, str):
Expand Down Expand Up @@ -223,27 +228,27 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
)
return c
except: # pylint: disable=bare-except
self.log.exception("Unable to create consumer, retrying")
LOG.exception("Unable to create consumer, retrying")
await asyncio.sleep(1)

async def delete_consumer(self, internal_name: Tuple[str, str], content_type: str):
self.log.info("Deleting consumer for %s", internal_name)
LOG.info("Deleting consumer for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
async with self.consumer_locks[internal_name]:
try:
c = self.consumers.pop(internal_name)
c.consumer.close()
self.consumer_locks.pop(internal_name)
except: # pylint: disable=bare-except
self.log.exception("Unable to properly dispose of consumer")
LOG.exception("Unable to properly dispose of consumer")
finally:
empty_response()

# OFFSETS
async def commit_offsets(
self, internal_name: Tuple[str, str], content_type: str, request_data: dict, cluster_metadata: dict
):
self.log.info("Committing offsets for %s", internal_name)
LOG.info("Committing offsets for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
if request_data:
self._assert_has_key(request_data, "offsets", content_type)
Expand All @@ -266,7 +271,7 @@ async def commit_offsets(
empty_response()

async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, request_data: dict):
self.log.info("Retrieving offsets for %s", internal_name)
LOG.info("Retrieving offsets for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
self._assert_has_key(request_data, "partitions", content_type)
response = {"offsets": []}
Expand All @@ -290,7 +295,7 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r

# SUBSCRIPTION
async def set_subscription(self, internal_name: Tuple[str, str], content_type: str, request_data: dict):
self.log.info("Updating subscription for %s", internal_name)
LOG.info("Updating subscription for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
topics = request_data.get("topics", [])
topics_pattern = request_data.get("topic_pattern")
Expand All @@ -307,10 +312,10 @@ async def set_subscription(self, internal_name: Tuple[str, str], content_type: s
except IllegalStateError as e:
self._illegal_state_fail(str(e), content_type=content_type)
finally:
self.log.info("Done updating subscription")
LOG.info("Done updating subscription")

async def get_subscription(self, internal_name: Tuple[str, str], content_type: str):
self.log.info("Retrieving subscription for %s", internal_name)
LOG.info("Retrieving subscription for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
async with self.consumer_locks[internal_name]:
consumer = self.consumers[internal_name].consumer
Expand All @@ -321,15 +326,15 @@ async def get_subscription(self, internal_name: Tuple[str, str], content_type: s
KarapaceBase.r(content_type=content_type, body={"topics": topics})

async def delete_subscription(self, internal_name: Tuple[str, str], content_type: str):
self.log.info("Deleting subscription for %s", internal_name)
LOG.info("Deleting subscription for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
async with self.consumer_locks[internal_name]:
self.consumers[internal_name].consumer.unsubscribe()
empty_response()

# ASSIGNMENTS
async def set_assignments(self, internal_name: Tuple[str, str], content_type: str, request_data: dict):
self.log.info("Updating assignments for %s to %r", internal_name, request_data)
LOG.info("Updating assignments for %s to %r", internal_name, request_data)
self._assert_consumer_exists(internal_name, content_type)
self._assert_has_key(request_data, "partitions", content_type)
partitions = []
Expand All @@ -346,10 +351,10 @@ async def set_assignments(self, internal_name: Tuple[str, str], content_type: st
except IllegalStateError as e:
self._illegal_state_fail(message=str(e), content_type=content_type)
finally:
self.log.info("Done updating assignment")
LOG.info("Done updating assignment")

async def get_assignments(self, internal_name: Tuple[str, str], content_type: str):
self.log.info("Retrieving assignment for %s", internal_name)
LOG.info("Retrieving assignment for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
async with self.consumer_locks[internal_name]:
consumer = self.consumers[internal_name].consumer
Expand All @@ -360,7 +365,7 @@ async def get_assignments(self, internal_name: Tuple[str, str], content_type: st

# POSITIONS
async def seek_to(self, internal_name: Tuple[str, str], content_type: str, request_data: dict):
self.log.info("Resetting offsets for %s to %r", internal_name, request_data)
LOG.info("Resetting offsets for %s to %r", internal_name, request_data)
self._assert_consumer_exists(internal_name, content_type)
self._assert_has_key(request_data, "offsets", content_type)
seeks = []
Expand All @@ -384,7 +389,7 @@ async def seek_limit(
self, internal_name: Tuple[str, str], content_type: str, request_data: dict, beginning: bool = True
):
direction = "beginning" if beginning else "end"
self.log.info("Seeking %s offsets", direction)
LOG.info("Seeking %s offsets", direction)
self._assert_consumer_exists(internal_name, content_type)
self._assert_has_key(request_data, "partitions", content_type)
resets = []
Expand All @@ -406,7 +411,7 @@ async def seek_limit(
self._illegal_state_fail(f"Trying to reset unassigned partitions to {direction}", content_type)

async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats: dict, query_params: dict):
self.log.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats)
LOG.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats)
self._assert_consumer_exists(internal_name, content_type)
async with self.consumer_locks[internal_name]:
consumer = self.consumers[internal_name].consumer
Expand All @@ -420,7 +425,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats
content_type=content_type,
message=f"Consumer format {serialization_format} does not match the embedded format {request_format}",
)
self.log.info("Fetch request for %s with params %r", internal_name, query_params)
LOG.info("Fetch request for %s with params %r", internal_name, query_params)
try:
timeout = (
int(query_params["timeout"]) if "timeout" in query_params else config["consumer.request.timeout.ms"]
Expand All @@ -438,7 +443,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats
if val <= 0:
KarapaceBase.internal_error(message=f"Invalid request parameter {val}", content_type=content_type)
response = []
self.log.info(
LOG.info(
"Will poll multiple times for a single message with a total timeout of %dms, "
"until at least %d bytes have been fetched",
timeout,
Expand All @@ -451,14 +456,14 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats
while read_bytes < max_bytes and start_time + timeout / 1000 > time.monotonic():
time_left = start_time + timeout / 1000 - time.monotonic()
bytes_left = max_bytes - read_bytes
self.log.info(
LOG.info(
"Polling with %r time left and %d bytes left, gathered %d messages so far",
time_left,
bytes_left,
message_count,
)
data = consumer.poll(timeout_ms=timeout, max_records=1)
self.log.debug("Successfully polled for messages")
LOG.debug("Successfully polled for messages")
for topic, records in data.items():
for rec in records:
message_count += 1
Expand All @@ -468,7 +473,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats
+ max(0, rec.serialized_header_size)
)
poll_data[topic].append(rec)
self.log.info("Gathered %d total messages", message_count)
LOG.info("Gathered %d total messages", message_count)
for tp in poll_data:
for msg in poll_data[tp]:
try:
Expand Down
Loading

0 comments on commit 27f10b9

Please sign in to comment.