From d475805d8cfffa0277bdb4a43ee2556b88c5c014 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Mon, 11 Apr 2022 09:06:20 +0200 Subject: [PATCH 1/4] tests: reduced test timeout --- .github/workflows/tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6e2e9c6f1..369cfaa87 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -38,6 +38,7 @@ jobs: run: python3 -m pytest -s -vvv tests/unit/ - name: Execute integration-tests + timeout-minutes: 10 run: python3 -m pytest -s -vvv tests/integration/ --log-dir=/tmp/ci-logs --log-file=/tmp/ci-logs/pytest.log - name: Archive logs From 0a151e81455bb4b812e0305b209b43c16a0b86c1 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Fri, 8 Apr 2022 20:55:45 +0200 Subject: [PATCH 2/4] tests: explained the slice of the UUID --- tests/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index 9baaec02c..1d887e07d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -176,6 +176,8 @@ async def new_consumer(c, group, fmt="avro", trail=""): def new_random_name(prefix: str) -> str: + # A hyphen is not a valid character for Avro identifiers. Use only the + # first 8 characters of the UUID. suffix = str(uuid.uuid4())[:8] return f"{prefix}{suffix}" @@ -205,9 +207,7 @@ def create_id_factory(prefix: str) -> Callable[[], str]: def create_name() -> str: nonlocal index - random_name = str(uuid.uuid4())[:8] - name = f"{quote(prefix).replace('/', '_')}_{index}_{random_name}" - return name + return new_random_name(f"{quote(prefix).replace('/', '_')}_{index}_") return create_name From ecf521b97f292d8b57270d099433ea8a3343ca57 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Fri, 8 Apr 2022 21:11:09 +0200 Subject: [PATCH 3/4] tests: fix integration tests flakiness This contains a variety of chances to remove flakiness of the integration tests. There were two main culprits for flakiness: 1: Using the same topic for the schemas: - Each test has its own registry, when running tests in parallel, to prevent that these servers join the same group, different group ids are used. - However, the registries were all using the _schemas topic to store the schemas, which meant there where multiple nodes thinking they are the leader writing to the same topic, causing flakiness in the tests - This corrupted the schema IDs, meaning a different test could write a totally different schema with the same id via another leader. 2: The same port range was used for all runners, which meant port reuse was possible Besides fixing the above, this also changes how registries are executed in the test suite. With this change the servers are started using subprocesses. This has the disavantage that cleanup is harder, but it does test the initialization and server codepath, which previously was skipped because of the aiohttp client. --- karapace/schema_backup.py | 6 +- requirements-dev.txt | 1 + tests/integration/conftest.py | 277 ++++++++---------- tests/integration/test_master_coordinator.py | 150 +++++----- tests/integration/test_schema.py | 23 +- tests/integration/test_schema_backup.py | 10 +- .../test_schema_backup_avro_export.py | 13 +- tests/integration/utils/cluster.py | 97 ++++++ tests/integration/utils/network.py | 101 +++---- 9 files changed, 376 insertions(+), 302 deletions(-) create mode 100644 tests/integration/utils/cluster.py diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index 4d372cf15..c88f79297 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -11,7 +11,7 @@ from karapace.anonymize_schemas import anonymize_avro from karapace.config import Config, read_config from karapace.schema_reader import KafkaSchemaReader -from karapace.utils import json_encode, KarapaceKafkaClient +from karapace.utils import json_encode, KarapaceKafkaClient, Timeout from typing import Dict, List, Optional, Tuple import argparse @@ -26,10 +26,6 @@ class BackupError(Exception): """Backup Error""" -class Timeout(Exception): - """Timeout Error""" - - class SchemaBackup: def __init__(self, config: Config, backup_path: str, topic_option: Optional[str] = None) -> None: self.config = config diff --git a/requirements-dev.txt b/requirements-dev.txt index 534e62500..6fa8a347f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,6 +10,7 @@ pytest==6.2.5 pytest-xdist[psutil]==2.2.1 pytest-timeout==1.4.2 pdbpp==0.10.2 +psutil==5.9.0 # workflow pre-commit>=2.2.0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 46abb58fc..b3b310ffc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -12,11 +12,10 @@ from filelock import FileLock from kafka import KafkaProducer from karapace.client import Client -from karapace.config import set_config_defaults, write_config +from karapace.config import Config, set_config_defaults, write_config from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient -from karapace.schema_registry_apis import KarapaceSchemaRegistry from pathlib import Path -from subprocess import Popen +from tests.integration.utils.cluster import RegistryDescription, start_schema_registry_cluster from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig from tests.integration.utils.kafka_server import ( configure_and_start_kafka, @@ -24,17 +23,19 @@ maybe_download_kafka, wait_for_kafka, ) -from tests.integration.utils.network import get_random_port, KAFKA_PORT_RANGE, REGISTRY_PORT_RANGE, ZK_PORT_RANGE +from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk -from tests.utils import new_random_name, repeat_until_successful_request -from typing import AsyncIterator, Iterator, Optional, Tuple +from tests.utils import repeat_until_successful_request +from typing import AsyncIterator, Iterator, List, Optional import asyncio import os import pathlib import pytest +import re +import string import time import ujson @@ -48,6 +49,36 @@ WORKER_COUNTER_KEY = "worker_counter" +def _clear_test_name(name: str) -> str: + # Based on: + # https://github.com/pytest-dev/pytest/blob/238b25ffa9d4acbc7072ac3dd6d8240765643aed/src/_pytest/tmpdir.py#L189-L194 + # The purpose is to return a similar name to make finding matching logs easier + return re.sub(r"[\W]", "_", name)[:30] + + +@pytest.fixture(scope="session", name="port_range") +def fixture_port_range() -> PortRangeInclusive: + """Container used by other fixtures to register used ports""" + # To find a good port range use the following: + # + # curl --silent 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt' | \ + # egrep -i -e '^\s*[0-9]+-[0-9]+\s*unassigned' | \ + # awk '{print $1}' + # + start = 48700 + end = 49000 + + # Split the ports among the workers to prevent port reuse + worker_name = os.environ.get("PYTEST_XDIST_WORKER", "0") + worker_id = int(worker_name.lstrip(string.ascii_letters)) + worker_count = int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1")) + total_ports = end - start + ports_per_worker = total_ports // worker_count + start_worker = (ports_per_worker * worker_id) + start + end_worker = start_worker + ports_per_worker - 1 + return PortRangeInclusive(start_worker, end_worker) + + @pytest.fixture(scope="session", name="kafka_description") def fixture_kafka_description(request: SubRequest) -> KafkaDescription: kafka_version = request.config.getoption("kafka_version") @@ -70,6 +101,7 @@ def fixture_kafka_server( session_datadir: Path, session_logdir: Path, kafka_description: KafkaDescription, + port_range: PortRangeInclusive, ) -> Iterator[KafkaServers]: bootstrap_servers = request.config.getoption("kafka_bootstrap_servers") @@ -88,20 +120,19 @@ def fixture_kafka_server( lock_file = lock_path_for(transfer_file) with ExitStack() as stack: + zk_client_port = stack.enter_context(port_range.allocate_port()) + zk_admin_port = stack.enter_context(port_range.allocate_port()) + kafka_plaintext_port = stack.enter_context(port_range.allocate_port()) + with FileLock(str(lock_file)): if transfer_file.exists(): config_data = ujson.loads(transfer_file.read_text()) zk_config = ZKConfig.from_dict(config_data["zookeeper"]) kafka_config = KafkaConfig.from_dict(config_data["kafka"]) - - # Count the new worker - config_data[WORKER_COUNTER_KEY] += 1 - transfer_file.write_text(ujson.dumps(config_data)) + config_data[WORKER_COUNTER_KEY] += 1 # Count the new worker else: maybe_download_kafka(kafka_description) - zk_client_port = get_random_port(port_range=ZK_PORT_RANGE, blacklist=[]) - zk_admin_port = get_random_port(port_range=ZK_PORT_RANGE, blacklist=[zk_client_port]) zk_config = ZKConfig( client_port=zk_client_port, admin_port=zk_admin_port, @@ -114,7 +145,6 @@ def fixture_kafka_server( # Make sure zookeeper is running before trying to start Kafka wait_for_port_subprocess(zk_config.client_port, zk_proc, wait_time=20) - kafka_plaintext_port = get_random_port(port_range=KAFKA_PORT_RANGE, blacklist=[]) data_dir = session_datadir / "kafka" log_dir = session_logdir / "kafka" data_dir.mkdir(parents=True) @@ -137,30 +167,32 @@ def fixture_kafka_server( "kafka": asdict(kafka_config), WORKER_COUNTER_KEY: 1, } - transfer_file.write_text(ujson.dumps(config_data)) - - # Make sure every test worker can communicate with kafka - kafka_servers = KafkaServers(bootstrap_servers=[f"127.0.0.1:{kafka_config.plaintext_port}"]) - wait_for_kafka(kafka_servers, KAFKA_WAIT_TIMEOUT) - yield kafka_servers - - # Signal the worker finished - with FileLock(str(lock_file)): - assert transfer_file.exists(), "transfer_file disappeared" - config_data = ujson.loads(transfer_file.read_text()) - config_data[WORKER_COUNTER_KEY] -= 1 transfer_file.write_text(ujson.dumps(config_data)) - # Wait until every worker finished before stopping the servers - worker_counter = float("inf") - while worker_counter > 0: + try: + # Make sure every test worker can communicate with kafka + kafka_servers = KafkaServers(bootstrap_servers=[f"127.0.0.1:{kafka_config.plaintext_port}"]) + wait_for_kafka(kafka_servers, KAFKA_WAIT_TIMEOUT) + + yield kafka_servers + finally: + # This must be called on errors, otherwise the master node will wait forever with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" config_data = ujson.loads(transfer_file.read_text()) - worker_counter = config_data[WORKER_COUNTER_KEY] + config_data[WORKER_COUNTER_KEY] -= 1 + transfer_file.write_text(ujson.dumps(config_data)) - time.sleep(2) + # Wait until every worker finished before stopping the servers + worker_counter = float("inf") + while worker_counter > 0: + with FileLock(str(lock_file)): + assert transfer_file.exists(), "transfer_file disappeared" + config_data = ujson.loads(transfer_file.read_text()) + worker_counter = config_data[WORKER_COUNTER_KEY] + + time.sleep(2) return @@ -247,112 +279,63 @@ async def get_client() -> ClientSession: @pytest.fixture(scope="function", name="registry_async_pair") -def fixture_registry_async_pair( - tmp_path: Path, +async def fixture_registry_async_pair( + request: SubRequest, + loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument + session_logdir: Path, kafka_servers: KafkaServers, -) -> Iterator[Tuple[str, str]]: - master_config_path = tmp_path / "karapace_config_master.json" - slave_config_path = tmp_path / "karapace_config_slave.json" - master_port = get_random_port(port_range=REGISTRY_PORT_RANGE, blacklist=[]) - slave_port = get_random_port(port_range=REGISTRY_PORT_RANGE, blacklist=[master_port]) - topic_name = new_random_name("schema_pairs") - group_id = new_random_name("schema_pairs") - write_config( - master_config_path, - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": topic_name, - "group_id": group_id, - "advertised_hostname": "127.0.0.1", - "karapace_registry": True, - "port": master_port, - }, - ) - write_config( - slave_config_path, - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "topic_name": topic_name, - "group_id": group_id, - "advertised_hostname": "127.0.0.1", - "karapace_registry": True, - "port": slave_port, - }, - ) + port_range: PortRangeInclusive, +) -> Iterator[List[str]]: + """Starts a cluster of two Schema Registry servers and returns their URL endpoints.""" - master_process = None - slave_process = None - with ExitStack() as stack: - try: - master_process = stack.enter_context(Popen(["python", "-m", "karapace.karapace_all", str(master_config_path)])) - slave_process = stack.enter_context(Popen(["python", "-m", "karapace.karapace_all", str(slave_config_path)])) - wait_for_port_subprocess(master_port, master_process) - wait_for_port_subprocess(slave_port, slave_process) - yield f"http://127.0.0.1:{master_port}", f"http://127.0.0.1:{slave_port}" - finally: - if master_process: - master_process.kill() - if slave_process: - slave_process.kill() + config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers} + config2: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers} + + async with start_schema_registry_cluster( + config_templates=[config1, config2], + data_dir=session_logdir / _clear_test_name(request.node.name), + port_range=port_range, + ) as endpoints: + yield [server.endpoint.to_url() for server in endpoints] -@pytest.fixture(scope="function", name="registry_async") -async def fixture_registry_async( +@pytest.fixture(scope="function", name="registry_cluster") +async def fixture_registry_cluster( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument - tmp_path: Path, + session_logdir: Path, kafka_servers: KafkaServers, -) -> AsyncIterator[Optional[KarapaceSchemaRegistry]]: + port_range: PortRangeInclusive, +) -> AsyncIterator[RegistryDescription]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that # won't work and will cause test failures. registry_url = request.config.getoption("registry_url") if registry_url: - yield None + yield registry_url return - config_path = tmp_path / "karapace_config.json" - - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - # Using the default settings instead of random values, otherwise it - # would not be possible to run the tests with external services. - # Because of this every test must be written in such a way that it can - # be executed twice with the same servers. - # "topic_name": new_random_name("topic"), - "group_id": new_random_name("registry_async"), - } - ) - write_config(config_path, config) - registry = KarapaceSchemaRegistry(config=config) - await registry.get_master() - try: - yield registry - finally: - await registry.close() + config = {"bootstrap_uri": kafka_servers.bootstrap_servers} + async with start_schema_registry_cluster( + config_templates=[config], + data_dir=session_logdir / _clear_test_name(request.node.name), + port_range=port_range, + ) as servers: + yield servers[0] @pytest.fixture(scope="function", name="registry_async_client") async def fixture_registry_async_client( request: SubRequest, + registry_cluster: RegistryDescription, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument - registry_async: KarapaceSchemaRegistry, - aiohttp_client: AiohttpClient, ) -> AsyncIterator[Client]: - registry_url = request.config.getoption("registry_url") - - # client and server_uri are incompatible settings. - if registry_url: - client = Client(server_uri=registry_url, server_ca=request.config.getoption("server_ca")) - else: - - async def get_client() -> ClientSession: - return await aiohttp_client(registry_async.app) - - client = Client(client_factory=get_client) + client = Client( + server_uri=registry_cluster.endpoint.to_url(), + server_ca=request.config.getoption("server_ca"), + ) try: # wait until the server is listening, otherwise the tests may fail @@ -361,7 +344,7 @@ async def get_client() -> ClientSession: "subjects", json_data=None, headers=None, - error_msg="REST API is unreachable", + error_msg=f"Registry API {client.server_uri} is unreachable", timeout=10, sleep=0.3, ) @@ -392,68 +375,50 @@ def fixture_server_key(credentials_folder: str) -> str: return os.path.join(credentials_folder, "serverkey.pem") -@pytest.fixture(scope="function", name="registry_async_tls") -async def fixture_registry_async_tls( +@pytest.fixture(scope="function", name="registry_https_endpoint") +async def fixture_registry_https_endpoint( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument - tmp_path: Path, + session_logdir: Path, kafka_servers: KafkaServers, server_cert: str, server_key: str, -) -> AsyncIterator[Optional[KarapaceSchemaRegistry]]: + port_range: PortRangeInclusive, +) -> AsyncIterator[str]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that # won't work and will cause test failures. registry_url = request.config.getoption("registry_url") if registry_url: - yield None + yield registry_url return - config_path = tmp_path / "karapace_config.json" - - config = set_config_defaults( - { - "bootstrap_uri": kafka_servers.bootstrap_servers, - "server_tls_certfile": server_cert, - "server_tls_keyfile": server_key, - "port": 8444, - # Using the default settings instead of random values, otherwise it - # would not be possible to run the tests with external services. - # Because of this every test must be written in such a way that it can - # be executed twice with the same servers. - # "topic_name": new_random_name("topic"), - "group_id": new_random_name("registry_async_tls"), - } - ) - write_config(config_path, config) - registry = KarapaceSchemaRegistry(config=config) - await registry.get_master() - try: - yield registry - finally: - await registry.close() + config = { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "server_tls_certfile": server_cert, + "server_tls_keyfile": server_key, + } + async with start_schema_registry_cluster( + config_templates=[config], + data_dir=session_logdir / _clear_test_name(request.node.name), + port_range=port_range, + ) as servers: + yield servers[0].endpoint.to_url() @pytest.fixture(scope="function", name="registry_async_client_tls") async def fixture_registry_async_client_tls( - request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument - registry_async_tls: KarapaceSchemaRegistry, - aiohttp_client: AiohttpClient, + registry_https_endpoint: str, server_ca: str, ) -> AsyncIterator[Client]: + pytest.skip("Test certification is not properly set") - registry_url = request.config.getoption("registry_url") - - if registry_url: - client = Client(server_uri=registry_url, server_ca=request.config.getoption("server_ca")) - else: - - async def get_client() -> ClientSession: - return await aiohttp_client(registry_async_tls.app) - - client = Client(client_factory=get_client, server_ca=server_ca) + client = Client( + server_uri=registry_https_endpoint, + server_ca=server_ca, + ) try: # wait until the server is listening, otherwise the tests may fail @@ -462,7 +427,7 @@ async def get_client() -> ClientSession: "subjects", json_data=None, headers=None, - error_msg="REST API is unreachable", + error_msg=f"Registry API {registry_https_endpoint} is unreachable", timeout=10, sleep=0.3, ) diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 764e95706..eb8741cc2 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -8,7 +8,7 @@ from karapace.config import set_config_defaults from karapace.master_coordinator import MasterCoordinator from tests.integration.utils.kafka_server import KafkaServers -from tests.integration.utils.network import get_random_port, TESTS_PORT_RANGE +from tests.integration.utils.network import PortRangeInclusive from tests.utils import new_random_name import asyncio @@ -18,10 +18,6 @@ import ujson -class Timeout(Exception): - pass - - def init_admin(config): mc = MasterCoordinator(config=config) mc.start() @@ -44,82 +40,82 @@ def has_master(mc: MasterCoordinator) -> bool: @pytest.mark.timeout(60) # Github workflows need a bit of extra time @pytest.mark.parametrize("strategy", ["lowest", "highest"]) -def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None: +def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None: # Use random port to allow for parallel runs. - port1 = get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]) - port2 = get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[port1]) - port_aa, port_bb = sorted((port1, port2)) - client_id_aa = new_random_name("master_selection_aa_") - client_id_bb = new_random_name("master_selection_bb_") - group_id = new_random_name("group_id") - - config_aa = set_config_defaults( - { - "advertised_hostname": "127.0.0.1", - "bootstrap_uri": kafka_servers.bootstrap_servers, - "client_id": client_id_aa, - "group_id": group_id, - "port": port_aa, - "master_election_strategy": strategy, - } - ) - config_bb = set_config_defaults( - { - "advertised_hostname": "127.0.0.1", - "bootstrap_uri": kafka_servers.bootstrap_servers, - "client_id": client_id_bb, - "group_id": group_id, - "port": port_bb, - "master_election_strategy": strategy, - } - ) - - with closing(init_admin(config_aa)) as mc_aa, closing(init_admin(config_bb)) as mc_bb: - if strategy == "lowest": - master = mc_aa - slave = mc_bb - else: - master = mc_bb - slave = mc_aa - - # Wait for the election to happen - while not is_master(master): - time.sleep(0.3) - - while not has_master(slave): - time.sleep(0.3) - - # Make sure the end configuration is as expected - master_url = f'http://{master.config["host"]}:{master.config["port"]}' - assert master.sc.election_strategy == strategy - assert slave.sc.election_strategy == strategy - assert master.sc.master_url == master_url - assert slave.sc.master_url == master_url - - -def test_no_eligible_master(kafka_servers: KafkaServers) -> None: + with port_range.allocate_port() as port1, port_range.allocate_port() as port2: + port_aa, port_bb = sorted((port1, port2)) + client_id_aa = new_random_name("master_selection_aa_") + client_id_bb = new_random_name("master_selection_bb_") + group_id = new_random_name("group_id") + + config_aa = set_config_defaults( + { + "advertised_hostname": "127.0.0.1", + "bootstrap_uri": kafka_servers.bootstrap_servers, + "client_id": client_id_aa, + "group_id": group_id, + "port": port_aa, + "master_election_strategy": strategy, + } + ) + config_bb = set_config_defaults( + { + "advertised_hostname": "127.0.0.1", + "bootstrap_uri": kafka_servers.bootstrap_servers, + "client_id": client_id_bb, + "group_id": group_id, + "port": port_bb, + "master_election_strategy": strategy, + } + ) + + with closing(init_admin(config_aa)) as mc_aa, closing(init_admin(config_bb)) as mc_bb: + if strategy == "lowest": + master = mc_aa + slave = mc_bb + else: + master = mc_bb + slave = mc_aa + + # Wait for the election to happen + while not is_master(master): + time.sleep(0.3) + + while not has_master(slave): + time.sleep(0.3) + + # Make sure the end configuration is as expected + master_url = f'http://{master.config["host"]}:{master.config["port"]}' + assert master.sc.election_strategy == strategy + assert slave.sc.election_strategy == strategy + assert master.sc.master_url == master_url + assert slave.sc.master_url == master_url + + +def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: client_id = new_random_name("master_selection_") group_id = new_random_name("group_id") - config_aa = set_config_defaults( - { - "advertised_hostname": "127.0.0.1", - "bootstrap_uri": kafka_servers.bootstrap_servers, - "client_id": client_id, - "group_id": group_id, - "port": get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]), - "master_eligibility": False, - } - ) - - with closing(init_admin(config_aa)) as mc: - # Wait for the election to happen, ie. flag is not None - while not mc.sc or mc.sc.are_we_master is None: - time.sleep(0.3) - - # Make sure the end configuration is as expected - assert mc.sc.are_we_master is False - assert mc.sc.master_url is None + with port_range.allocate_port() as port: + config_aa = set_config_defaults( + { + "advertised_hostname": "127.0.0.1", + "bootstrap_uri": kafka_servers.bootstrap_servers, + "client_id": client_id, + "group_id": group_id, + "port": port, + "master_eligibility": False, + } + ) + + with closing(init_admin(config_aa)) as mc: + # Wait for the election to happen, ie. flag is not None + while not mc.sc or mc.sc.are_we_master is None: + time.sleep(0.3) + + # Make sure the end configuration is as expected + assert mc.sc.are_we_master is False + assert mc.sc.master_url is None async def test_schema_request_forwarding(registry_async_pair): diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 5424ab626..477cd93f5 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -6,10 +6,10 @@ """ from http import HTTPStatus from kafka import KafkaProducer -from karapace import config from karapace.client import Client from karapace.rapu import is_success -from karapace.schema_registry_apis import KarapaceSchemaRegistry, SchemaErrorMessages +from karapace.schema_registry_apis import SchemaErrorMessages +from tests.integration.utils.cluster import RegistryDescription from tests.integration.utils.kafka_server import KafkaServers from tests.utils import ( create_field_name_factory, @@ -863,12 +863,12 @@ async def assert_schema_versions(client: Client, trail: str, schema_id: int, exp """ res = await client.get(f"/schemas/ids/{schema_id}/versions{trail}") assert res.status_code == 200 + registered_schemas = res.json() # Schema Registry doesn't return an ordered list, Karapace does. # Need to check equality ignoring ordering. - assert len(res.json()) == len(expected) - for e in ({"subject": e[0], "version": e[1]} for e in expected): - assert e in res.json() + result = [(schema["subject"], schema["version"]) for schema in registered_schemas] + assert set(result) == set(expected) async def assert_schema_versions_failed(client: Client, trail: str, schema_id: int, response_code: int = 404) -> None: @@ -1895,13 +1895,10 @@ async def test_schema_remains_constant(registry_async_client: Client) -> None: async def test_malformed_kafka_message( - kafka_servers: KafkaServers, registry_async: KarapaceSchemaRegistry, registry_async_client: Client + kafka_servers: KafkaServers, + registry_cluster: RegistryDescription, + registry_async_client: Client, ) -> None: - if registry_async: - topic = registry_async.config["topic_name"] - else: - topic = config.DEFAULTS["topic_name"] - producer = KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers) message_key = {"subject": "foo", "version": 1, "magic": 1, "keytype": "SCHEMA"} import random @@ -1910,7 +1907,9 @@ async def test_malformed_kafka_message( payload = {"schema": jsonlib.dumps({"foo": "bar"}, indent=None, separators=(",", ":"))} message_value = {"deleted": False, "id": schema_id, "subject": "foo", "version": 1} message_value.update(payload) - producer.send(topic, key=ujson.dumps(message_key).encode(), value=ujson.dumps(message_value).encode()).get() + producer.send( + registry_cluster.schemas_topic, key=ujson.dumps(message_key).encode(), value=ujson.dumps(message_value).encode() + ).get() path = f"schemas/ids/{schema_id}" res = await repeat_until_successful_request( diff --git a/tests/integration/test_schema_backup.py b/tests/integration/test_schema_backup.py index 974c0ce84..414b6c5dd 100644 --- a/tests/integration/test_schema_backup.py +++ b/tests/integration/test_schema_backup.py @@ -9,6 +9,7 @@ from karapace.schema_backup import SchemaBackup from karapace.utils import Expiration from pathlib import Path +from tests.integration.utils.cluster import RegistryDescription from tests.integration.utils.kafka_server import KafkaServers from tests.utils import new_random_name @@ -47,6 +48,7 @@ async def test_backup_restore( registry_async_client: Client, kafka_servers: KafkaServers, tmp_path: Path, + registry_cluster: RegistryDescription, ) -> None: subject = new_random_name("subject") restore_location = tmp_path / "restore.log" @@ -73,7 +75,12 @@ async def test_backup_restore( fp, ) - config = set_config_defaults({"bootstrap_uri": kafka_servers.bootstrap_servers}) + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "topic_name": registry_cluster.schemas_topic, + } + ) sb = SchemaBackup(config, str(restore_location)) sb.restore_backup() @@ -89,6 +96,7 @@ async def test_backup_restore( res = await registry_async_client.get("subjects") assert res.status_code == 200 all_subjects = res.json() + time.sleep(0.1) # Test a few exotic scenarios subject = new_random_name("subject") diff --git a/tests/integration/test_schema_backup_avro_export.py b/tests/integration/test_schema_backup_avro_export.py index a874dcbc6..5eb24e55b 100644 --- a/tests/integration/test_schema_backup_avro_export.py +++ b/tests/integration/test_schema_backup_avro_export.py @@ -8,6 +8,7 @@ from karapace.config import set_config_defaults from karapace.schema_backup import SchemaBackup from pathlib import Path +from tests.integration.utils.cluster import RegistryDescription from tests.integration.utils.kafka_server import KafkaServers from typing import Any, Dict @@ -63,14 +64,22 @@ async def insert_data(c: Client, schemaType: str, subject: str, data: Dict[str, async def test_export_anonymized_avro_schemas( - registry_async_client: Client, kafka_servers: KafkaServers, tmp_path: Path + registry_async_client: Client, + kafka_servers: KafkaServers, + tmp_path: Path, + registry_cluster: RegistryDescription, ) -> None: await insert_data(registry_async_client, "JSON", JSON_SUBJECT, JSON_SCHEMA) await insert_data(registry_async_client, "AVRO", AVRO_SUBJECT, AVRO_SCHEMA) # Get the backup export_location = tmp_path / "export.log" - config = set_config_defaults({"bootstrap_uri": kafka_servers.bootstrap_servers}) + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "topic_name": registry_cluster.schemas_topic, + } + ) sb = SchemaBackup(config, str(export_location)) sb.export_anonymized_avro_schemas() diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py new file mode 100644 index 000000000..80f965f0f --- /dev/null +++ b/tests/integration/utils/cluster.py @@ -0,0 +1,97 @@ +from contextlib import asynccontextmanager, ExitStack +from dataclasses import dataclass +from karapace.config import Config, set_config_defaults, write_config +from pathlib import Path +from subprocess import Popen +from tests.integration.utils.network import PortRangeInclusive +from tests.integration.utils.process import stop_process, wait_for_port_subprocess +from tests.utils import new_random_name +from typing import AsyncIterator, List + + +@dataclass(frozen=True) +class RegistryEndpoint: + protocol: str + host: str + port: int + + def to_url(self) -> str: + return f"{self.protocol}://{self.host}:{self.port}" + + +@dataclass(frozen=True) +class RegistryDescription: + endpoint: RegistryEndpoint + schemas_topic: str + + +@asynccontextmanager +async def start_schema_registry_cluster( + config_templates: List[Config], + data_dir: Path, + port_range: PortRangeInclusive, +) -> AsyncIterator[List[RegistryDescription]]: + """Start a cluster of schema registries, one process per `config_templates`.""" + for template in config_templates: + assert "bootstrap_uri" in template, "base_config must have the value `bootstrap_uri` set" + + # None is considered a valid value, and it represents the lack of user + # configuration, so this will generate one for the cluster + group_ids = set(config.get("group_id") for config in config_templates) + assert len(group_ids) == 1, f"All configurations entries must have the same group_id value, got: {group_ids}" + + group_id = new_random_name("group_id") + schemas_topic = new_random_name("_schemas") + + all_processes = [] + all_registries = [] + with ExitStack() as stack: + for pos, template in enumerate(config_templates): + config = dict(template) + del template + + # For testing we don't want to expose the hostname, usually the loopback interface is + # used (127.0.0.1), and the name resolution would instead return the machine's network + # address, (e.g. 192.168.0.1), which would cause connect failures + host = config.setdefault("host", "127.0.0.1") + assert isinstance(host, str), "host must be str" + config.setdefault("advertised_hostname", host) + config.setdefault("topic_name", schemas_topic) + config.setdefault("karapace_registry", True) + config.setdefault( + "log_format", + "%(asctime)s [%(threadName)s] %(filename)s:%(funcName)s:%(lineno)d %(message)s", + ) + actual_group_id = config.setdefault("group_id", group_id) + + port = config.setdefault("port", stack.enter_context(port_range.allocate_port())) + assert isinstance(port, int), "Port must be an integer" + + group_dir = data_dir / str(actual_group_id) + group_dir.mkdir(parents=True, exist_ok=True) + config_path = group_dir / f"{pos}.config.json" + log_path = group_dir / f"{pos}.log" + error_path = group_dir / f"{pos}.error" + + config = set_config_defaults(config) + write_config(config_path, config) + + logfile = stack.enter_context(open(log_path, "w")) + errfile = stack.enter_context(open(error_path, "w")) + process = Popen( + args=["python", "-m", "karapace.karapace_all", str(config_path)], + stdout=logfile, + stderr=errfile, + ) + stack.callback(stop_process, process) + all_processes.append(process) + + protocol = "http" if config.get("server_tls_keyfile") is None else "https" + endpoint = RegistryEndpoint(protocol, host, port) + description = RegistryDescription(endpoint, schemas_topic) + all_registries.append(description) + + for process in all_processes: + wait_for_port_subprocess(port, process, hostname=host) + + yield all_registries diff --git a/tests/integration/utils/network.py b/tests/integration/utils/network.py index 34ba3cce2..7d43e23da 100644 --- a/tests/integration/utils/network.py +++ b/tests/integration/utils/network.py @@ -2,68 +2,71 @@ Copyright (c) 2022 Aiven Ltd See LICENSE for details """ -from dataclasses import dataclass -from typing import List +from contextlib import contextmanager -import random +import psutil import socket -@dataclass(frozen=True) -class PortRangeInclusive: - start: int - end: int +def is_time_wait(port: int) -> bool: + """True if the port is still on TIME_WAIT state.""" + return any(conn.laddr.port == port for conn in psutil.net_connections(kind="inet")) + +class PortRangeInclusive: PRIVILEGE_END = 2**10 MAX_PORTS = 2**16 - 1 - def __post_init__(self) -> None: + def __init__( + self, + start: int, + end: int, + ) -> None: # Make sure the range is valid and that we don't need to be root - assert self.end > self.start, "there must be at least one port available" - assert self.end <= self.MAX_PORTS, f"end must be lower than {self.MAX_PORTS}" - assert self.start > self.PRIVILEGE_END, "start must not be a privileged port" + assert end > start, "there must be at least one port available" + assert end <= self.MAX_PORTS, f"end must be lower than {self.MAX_PORTS}" + assert start > self.PRIVILEGE_END, "start must not be a privileged port" - def next_range(self, number_of_ports: int) -> "PortRangeInclusive": - next_start = self.end + 1 - next_end = next_start + number_of_ports - 1 # -1 because the range is inclusive + self.start = start + self.end = end + self._maybe_available = list(range(start, end + 1)) + def next_range(self, number_of_ports: int) -> "PortRangeInclusive": + next_start = self.end + next_end = next_start + number_of_ports return PortRangeInclusive(next_start, next_end) - -# To find a good port range use the following: -# -# curl --silent 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt' | \ -# egrep -i -e '^\s*[0-9]+-[0-9]+\s*unassigned' | \ -# awk '{print $1}' -# -KAFKA_PORT_RANGE = PortRangeInclusive(48700, 48800) -ZK_PORT_RANGE = KAFKA_PORT_RANGE.next_range(100) -REGISTRY_PORT_RANGE = ZK_PORT_RANGE.next_range(100) -TESTS_PORT_RANGE = REGISTRY_PORT_RANGE.next_range(100) - - -def get_random_port(*, port_range: PortRangeInclusive, blacklist: List[int]) -> int: - """Find a random port in the range `PortRangeInclusive`. - - Note: - This function is *not* aware of the ports currently open in the system, - the blacklist only prevents two services of the same type to randomly - get the same ports for *a single test run*. - - Because of that, the port range should be chosen such that there is no - system service in the range. Also note that running two sessions of the - tests with the same range is not supported and will lead to flakiness. - """ - assert port_range.start <= port_range.end, f"{port_range.start} must be less-than-or-equal to {port_range.end}" - - # +1 because randint is inclusive for both ends - ports_in_range = (port_range.end - port_range.start) + 1 - assert len(blacklist) < ports_in_range, f"no free ports available. Range {port_range}, blacklist: {blacklist}" - - value = random.randint(port_range.start, port_range.end) - while value in blacklist: - value = random.randint(port_range.start, port_range.end) - return value + @contextmanager + def allocate_port(self) -> int: + """Find a random port in the range `PortRangeInclusive`. + + Note: + This function is *not* aware of the ports currently open in the system, + the blacklist only prevents two services of the same type to randomly + get the same ports for *a single test run*. + + Because of that, the port range should be chosen such that there is no + system service in the range. Also note that running two sessions of the + tests with the same range is not supported and will lead to flakiness. + """ + if len(self._maybe_available) == 0: + raise RuntimeError(f"No free ports available. start: {self.start} end: {self.end}") + + filtered_ports = ((pos, port) for pos, port in enumerate(self._maybe_available) if not is_time_wait(port)) + + try: + pos, port = next(filtered_ports) + except StopIteration as e: + raise RuntimeError( + f"No free ports available. start: {self.start} end: {self.end} time_wait: {self._maybe_available}" + ) from e + + self._maybe_available.pop(pos) + yield port + + # Append the port at the end, this is a hack to give extra time for a TIME_WAIT socket to + # close, but it is not sufficient. + self._maybe_available.append(port) def port_is_listening(hostname: str, port: int, ipv6: bool) -> bool: From dd15a560566e69e13891db239ef3aa0c2ab060e9 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Mon, 4 Apr 2022 16:32:43 +0200 Subject: [PATCH 4/4] logging: reduced logging - Coalesced logging lines and removed redundant logs - No longer logging the schema contents - Removed debug logging as request in PR review - Made logger instances module level as requested in PR review --- karapace/client.py | 5 +- karapace/compatibility/jsonschema/checks.py | 4 - karapace/compatibility/protobuf/checks.py | 7 -- karapace/config.py | 25 ++++-- karapace/kafka_rest_apis/__init__.py | 2 - karapace/kafka_rest_apis/admin.py | 8 +- karapace/kafka_rest_apis/consumer_manager.py | 69 +++++++++-------- karapace/karapace.py | 5 +- karapace/karapace_all.py | 6 +- karapace/master_coordinator.py | 30 +++----- karapace/protobuf/io.py | 3 - karapace/schema_backup.py | 37 ++++----- karapace/schema_models.py | 4 - karapace/schema_reader.py | 81 ++++++++++---------- karapace/serialization.py | 3 - karapace/statsd.py | 8 +- karapace/utils.py | 12 +-- 17 files changed, 143 insertions(+), 166 deletions(-) diff --git a/karapace/client.py b/karapace/client.py index 04e166a13..ded1fd685 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -12,11 +12,12 @@ import logging import ssl -log = logging.getLogger(__name__) Path = str Headers = dict JsonData = object # Type of the result after parsing JSON +LOG = logging.getLogger(__name__) + async def _get_aiohttp_client() -> ClientSession: return ClientSession() @@ -74,7 +75,7 @@ async def close(self) -> None: if self._client is not None: await self._client.close() except: # pylint: disable=bare-except - log.info("Could not close client") + LOG.error("Could not close client") async def get_client(self) -> ClientSession: if self._client is None: diff --git a/karapace/compatibility/jsonschema/checks.py b/karapace/compatibility/jsonschema/checks.py index 870d1b251..afc8dcaaf 100644 --- a/karapace/compatibility/jsonschema/checks.py +++ b/karapace/compatibility/jsonschema/checks.py @@ -30,11 +30,8 @@ ) from typing import Any, List, Optional -import logging import networkx as nx -LOG = logging.getLogger(__name__) - INTRODUCED_INCOMPATIBILITY_MSG_FMT = "Introduced incompatible assertion {assert_name} with value {introduced_value}" RESTRICTED_INCOMPATIBILITY_MSG_FMT = "More restrictive assertion {assert_name} from {writer_value} to {reader_value}" MODIFIED_INCOMPATIBILITY_MSG_FMT = "Assertion of {assert_name} changed from {writer_value} to {reader_value}" @@ -248,7 +245,6 @@ def compatibility_rec( # reader has type `array` to represent a list, and the writer is either a # different type or it is also an `array` but now it representes a tuple. if reader_schema is None and writer_schema is not None: - LOG.debug("Schema removed reader_schema.type='%r'", get_type_of(reader_schema)) return incompatible_schema( incompat_type=Incompatibility.schema_removed, message="schema removed", diff --git a/karapace/compatibility/protobuf/checks.py b/karapace/compatibility/protobuf/checks.py index 9b21f02c4..c995d1f03 100644 --- a/karapace/compatibility/protobuf/checks.py +++ b/karapace/compatibility/protobuf/checks.py @@ -2,17 +2,10 @@ from karapace.protobuf.compare_result import CompareResult from karapace.protobuf.schema import ProtobufSchema -import logging - -log = logging.getLogger(__name__) - def check_protobuf_schema_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: result = CompareResult() - log.debug("READER: %s", reader.to_schema()) - log.debug("WRITER: %s", writer.to_schema()) writer.compare(reader, result) - log.debug("IS_COMPATIBLE %s", result.is_compatible()) if result.is_compatible(): return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) diff --git a/karapace/config.py b/karapace/config.py index ab45b66fd..99cf698ce 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -19,7 +19,7 @@ Config = Dict[str, Union[None, str, int, bool, List[str], AccessLogger]] LOG = logging.getLogger(__name__) HOSTNAME = socket.gethostname() - +SASL_PLAIN_PASSWORD = "sasl_plain_password" DEFAULTS = { "access_logs_debug": False, "access_log_class": None, @@ -53,7 +53,7 @@ "ssl_password": None, "sasl_mechanism": None, "sasl_plain_username": None, - "sasl_plain_password": None, + SASL_PLAIN_PASSWORD: None, "topic_name": DEFAULT_SCHEMA_TOPIC, "metadata_max_age_ms": 60000, "admin_metadata_max_age": 5, @@ -67,6 +67,7 @@ "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", } +SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] class InvalidConfiguration(Exception): @@ -109,12 +110,20 @@ def set_settings_from_environment(config: Config) -> None: env_name = config_name_with_prefix.upper() env_val = os.environ.get(env_name) if env_val is not None: - LOG.debug( - "Populating config value %r from env var %r with %r instead of config file", - config_name, - env_name, - env_val, - ) + if config_name not in SECRET_CONFIG_OPTIONS: + LOG.info( + "Populating config value %r from env var %r with %r instead of config file", + config_name, + env_name, + env_val, + ) + else: + LOG.info( + "Populating config value %r from env var %r instead of config file", + config_name, + env_name, + ) + config[config_name] = parse_env_value(env_val) diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 2d8bb57cd..838b6a42c 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -17,7 +17,6 @@ import asyncio import base64 -import logging import time import ujson @@ -39,7 +38,6 @@ def __init__(self, config: Config) -> None: super().__init__(config=config) self._add_kafka_rest_routes() self.serializer = SchemaRegistrySerializer(config=config) - self.log = logging.getLogger("KarapaceRest") self._cluster_metadata = None self._metadata_birth = None self.metadata_max_age = self.config["admin_metadata_max_age"] diff --git a/karapace/kafka_rest_apis/admin.py b/karapace/kafka_rest_apis/admin.py index a0e2d6a16..bf4085bdf 100644 --- a/karapace/kafka_rest_apis/admin.py +++ b/karapace/kafka_rest_apis/admin.py @@ -9,12 +9,10 @@ import logging +LOG = logging.getLogger(__name__) -class KafkaRestAdminClient(KafkaAdminClient): - def __init__(self, **configs): - super().__init__(**configs) - self.log = logging.getLogger("AdminClient") +class KafkaRestAdminClient(KafkaAdminClient): def get_topic_config(self, topic: str) -> dict: config_version = self._matching_api_version(DescribeConfigsRequest) req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic)] @@ -50,7 +48,7 @@ def cluster_metadata(self, topics: List[str] = None, retries: int = 0) -> dict: except Cancelled: if retries > 3: raise - self.log.debug("Retrying metadata with %d retires", retries) + LOG.debug("Retrying metadata with %d retires", retries) return self.cluster_metadata(topics, retries + 1) return self._make_metadata_response(future.value) diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index f7dde4926..e52aad785 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -24,22 +24,21 @@ OFFSET_RESET_STRATEGIES = {"latest", "earliest"} TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"]) +LOG = logging.getLogger(__name__) + + +def new_name() -> str: + return str(uuid.uuid4()) class ConsumerManager: def __init__(self, config: dict) -> None: self.config = config self.hostname = f"http://{self.config['advertised_hostname']}:{self.config['port']}" - self.log = logging.getLogger("RestConsumerManager") self.deserializer = SchemaRegistryDeserializer(config=config) self.consumers = {} self.consumer_locks = defaultdict(Lock) - def new_name(self) -> str: - name = str(uuid.uuid4()) - self.log.debug("Generated new consumer name: %s", name) - return name - @staticmethod def _assert(cond: bool, code: HTTPStatus, sub_code: int, message: str, content_type: str) -> None: if not cond: @@ -155,12 +154,15 @@ def _update_partition_assignments(consumer: KafkaConsumer): # CONSUMER async def create_consumer(self, group_name: str, request_data: dict, content_type: str): group_name = group_name.strip("/") - self.log.info("Create consumer request for group %s", group_name) - consumer_name = request_data.get("name") or self.new_name() + consumer_name = request_data.get("name") or new_name() internal_name = self.create_internal_name(group_name, consumer_name) async with self.consumer_locks[internal_name]: if internal_name in self.consumers: - self.log.error("Error creating duplicate consumer in group %s with id %s", group_name, consumer_name) + LOG.warning( + "Error creating duplicate consumer in group %s with id %s", + group_name, + consumer_name, + ) KarapaceBase.r( status=HTTPStatus.CONFLICT, content_type=content_type, @@ -170,11 +172,14 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ }, ) self._validate_create_consumer(request_data, content_type) - self.log.info( - "Creating new consumer in group %s with id %s and request_info %r", group_name, consumer_name, request_data - ) for k in ["consumer.request.timeout.ms", "fetch_min_bytes"]: convert_to_int(request_data, k, content_type) + LOG.info( + "Creating new consumer in group. group name: %s consumer name: %s request_data %r", + group_name, + consumer_name, + request_data, + ) try: enable_commit = request_data.get("auto.commit.enable", self.config["consumer_enable_auto_commit"]) if isinstance(enable_commit, str): @@ -223,11 +228,11 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name ) return c except: # pylint: disable=bare-except - self.log.exception("Unable to create consumer, retrying") + LOG.exception("Unable to create consumer, retrying") await asyncio.sleep(1) async def delete_consumer(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Deleting consumer for %s", internal_name) + LOG.info("Deleting consumer for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: try: @@ -235,7 +240,7 @@ async def delete_consumer(self, internal_name: Tuple[str, str], content_type: st c.consumer.close() self.consumer_locks.pop(internal_name) except: # pylint: disable=bare-except - self.log.exception("Unable to properly dispose of consumer") + LOG.exception("Unable to properly dispose of consumer") finally: empty_response() @@ -243,7 +248,7 @@ async def delete_consumer(self, internal_name: Tuple[str, str], content_type: st async def commit_offsets( self, internal_name: Tuple[str, str], content_type: str, request_data: dict, cluster_metadata: dict ): - self.log.info("Committing offsets for %s", internal_name) + LOG.info("Committing offsets for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) if request_data: self._assert_has_key(request_data, "offsets", content_type) @@ -266,7 +271,7 @@ async def commit_offsets( empty_response() async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Retrieving offsets for %s", internal_name) + LOG.info("Retrieving offsets for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "partitions", content_type) response = {"offsets": []} @@ -290,7 +295,7 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r # SUBSCRIPTION async def set_subscription(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Updating subscription for %s", internal_name) + LOG.info("Updating subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) topics = request_data.get("topics", []) topics_pattern = request_data.get("topic_pattern") @@ -307,10 +312,10 @@ async def set_subscription(self, internal_name: Tuple[str, str], content_type: s except IllegalStateError as e: self._illegal_state_fail(str(e), content_type=content_type) finally: - self.log.info("Done updating subscription") + LOG.info("Done updating subscription") async def get_subscription(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Retrieving subscription for %s", internal_name) + LOG.info("Retrieving subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -321,7 +326,7 @@ async def get_subscription(self, internal_name: Tuple[str, str], content_type: s KarapaceBase.r(content_type=content_type, body={"topics": topics}) async def delete_subscription(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Deleting subscription for %s", internal_name) + LOG.info("Deleting subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: self.consumers[internal_name].consumer.unsubscribe() @@ -329,7 +334,7 @@ async def delete_subscription(self, internal_name: Tuple[str, str], content_type # ASSIGNMENTS async def set_assignments(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Updating assignments for %s to %r", internal_name, request_data) + LOG.info("Updating assignments for %s to %r", internal_name, request_data) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "partitions", content_type) partitions = [] @@ -346,10 +351,10 @@ async def set_assignments(self, internal_name: Tuple[str, str], content_type: st except IllegalStateError as e: self._illegal_state_fail(message=str(e), content_type=content_type) finally: - self.log.info("Done updating assignment") + LOG.info("Done updating assignment") async def get_assignments(self, internal_name: Tuple[str, str], content_type: str): - self.log.info("Retrieving assignment for %s", internal_name) + LOG.info("Retrieving assignment for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -360,7 +365,7 @@ async def get_assignments(self, internal_name: Tuple[str, str], content_type: st # POSITIONS async def seek_to(self, internal_name: Tuple[str, str], content_type: str, request_data: dict): - self.log.info("Resetting offsets for %s to %r", internal_name, request_data) + LOG.info("Resetting offsets for %s to %r", internal_name, request_data) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "offsets", content_type) seeks = [] @@ -384,7 +389,7 @@ async def seek_limit( self, internal_name: Tuple[str, str], content_type: str, request_data: dict, beginning: bool = True ): direction = "beginning" if beginning else "end" - self.log.info("Seeking %s offsets", direction) + LOG.info("Seeking %s offsets", direction) self._assert_consumer_exists(internal_name, content_type) self._assert_has_key(request_data, "partitions", content_type) resets = [] @@ -406,7 +411,7 @@ async def seek_limit( self._illegal_state_fail(f"Trying to reset unassigned partitions to {direction}", content_type) async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats: dict, query_params: dict): - self.log.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats) + LOG.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -420,7 +425,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats content_type=content_type, message=f"Consumer format {serialization_format} does not match the embedded format {request_format}", ) - self.log.info("Fetch request for %s with params %r", internal_name, query_params) + LOG.info("Fetch request for %s with params %r", internal_name, query_params) try: timeout = ( int(query_params["timeout"]) if "timeout" in query_params else config["consumer.request.timeout.ms"] @@ -438,7 +443,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats if val <= 0: KarapaceBase.internal_error(message=f"Invalid request parameter {val}", content_type=content_type) response = [] - self.log.info( + LOG.info( "Will poll multiple times for a single message with a total timeout of %dms, " "until at least %d bytes have been fetched", timeout, @@ -451,14 +456,14 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats while read_bytes < max_bytes and start_time + timeout / 1000 > time.monotonic(): time_left = start_time + timeout / 1000 - time.monotonic() bytes_left = max_bytes - read_bytes - self.log.info( + LOG.info( "Polling with %r time left and %d bytes left, gathered %d messages so far", time_left, bytes_left, message_count, ) data = consumer.poll(timeout_ms=timeout, max_records=1) - self.log.debug("Successfully polled for messages") + LOG.debug("Successfully polled for messages") for topic, records in data.items(): for rec in records: message_count += 1 @@ -468,7 +473,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats + max(0, rec.serialized_header_size) ) poll_data[topic].append(rec) - self.log.info("Gathered %d total messages", message_count) + LOG.info("Gathered %d total messages", message_count) for tp in poll_data: for msg in poll_data[tp]: try: diff --git a/karapace/karapace.py b/karapace/karapace.py index c2200c55e..2a2560350 100644 --- a/karapace/karapace.py +++ b/karapace/karapace.py @@ -11,8 +11,6 @@ from karapace.rapu import HTTPResponse, RestApp from typing import NoReturn, Union -import logging - class KarapaceBase(RestApp): def __init__(self, config: Config) -> None: @@ -20,9 +18,8 @@ def __init__(self, config: Config) -> None: self.kafka_timeout = 10 self.route("/", callback=self.root_get, method="GET") - self.log = logging.getLogger("Karapace") - self.log.info("Karapace initialized") self.app.on_shutdown.append(self.close_by_app) + self.log.info("Karapace initialized") async def close_by_app(self, app): # pylint: disable=unused-argument diff --git a/karapace/karapace_all.py b/karapace/karapace_all.py index 49c2c1060..7b111e03c 100644 --- a/karapace/karapace_all.py +++ b/karapace/karapace_all.py @@ -1,7 +1,7 @@ from aiohttp.web_log import AccessLogger from contextlib import closing from karapace import version as karapace_version -from karapace.config import Config, read_config +from karapace.config import read_config from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistry @@ -13,9 +13,7 @@ class KarapaceAll(KafkaRest, KarapaceSchemaRegistry): - def __init__(self, config: Config) -> None: - super().__init__(config=config) - self.log = logging.getLogger("KarapaceAll") + pass def main() -> int: diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index c667a69e0..77984201a 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -20,6 +20,7 @@ # SR group errors NO_ERROR = 0 DUPLICATE_URLS = 1 +LOG = logging.getLogger(__name__) def get_identity_url(scheme, host, port): @@ -35,10 +36,6 @@ class SchemaCoordinator(BaseCoordinator): master_url = None master_eligibility = True - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.log = logging.getLogger("SchemaCoordinator") - def protocol_type(self): return "sr" @@ -52,7 +49,7 @@ def group_protocols(self): return [("v0", self.get_identity(host=self.hostname, port=self.port, scheme=self.scheme))] def _perform_assignment(self, leader_id, protocol, members): - self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) + LOG.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) self.are_we_master = None error = NO_ERROR urls = {} @@ -82,7 +79,7 @@ def _perform_assignment(self, leader_id, protocol, members): scheme=member_identity["scheme"], json_encode=False, ) - self.log.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) + LOG.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) assignments = {} for member_id, member_data in members: @@ -94,7 +91,7 @@ def _on_join_prepare(self, generation, member_id): # needs to be implemented in our class for pylint to be satisfied def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): - self.log.info( + LOG.info( "Join complete, generation %r, member_id: %r, protocol: %r, member_assignment_bytes: %r", generation, member_id, @@ -119,13 +116,11 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b else: self.master_url = master_url self.are_we_master = False - # pylint: disable=super-with-arguments - return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) + return super()._on_join_complete(generation, member_id, protocol, member_assignment_bytes) - def _on_join_follower(self): - self.log.info("We are a follower, not a master") - # pylint: disable=super-with-arguments - return super(SchemaCoordinator, self)._on_join_follower() + def _on_join_follower(self) -> None: + LOG.info("We are a follower, not a master") + return super()._on_join_follower() class MasterCoordinator(Thread): @@ -142,7 +137,6 @@ def __init__(self, config): metric_config = MetricConfig(samples=2, time_window_ms=30000, tags=metrics_tags) self._metrics = Metrics(metric_config, reporters=[]) self.schema_coordinator_ready = Event() - self.log = logging.getLogger("MasterCoordinator") def init_kafka_client(self): try: @@ -161,7 +155,7 @@ def init_kafka_client(self): ) return True except (NodeNotReadyError, NoBrokersAvailable): - self.log.warning("No Brokers available yet, retrying init_kafka_client()") + LOG.warning("No Brokers available yet, retrying init_kafka_client()") time.sleep(2.0) return False @@ -187,7 +181,7 @@ def get_master_info(self) -> Tuple[bool, Optional[str]]: return self.sc.are_we_master, self.sc.master_url def close(self): - self.log.info("Closing master_coordinator") + LOG.info("Closing master_coordinator") self.running = False def run(self): @@ -203,10 +197,10 @@ def run(self): self.sc.ensure_active_group() self.sc.poll_heartbeat() - self.log.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) + LOG.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except - self.log.exception("Exception in master_coordinator") + LOG.exception("Exception in master_coordinator") time.sleep(1.0) if self.sc: diff --git a/karapace/protobuf/io.py b/karapace/protobuf/io.py index cf3678a10..ba8d8a223 100644 --- a/karapace/protobuf/io.py +++ b/karapace/protobuf/io.py @@ -10,12 +10,9 @@ import hashlib import importlib import importlib.util -import logging import os import subprocess -logger = logging.getLogger(__name__) - def calculate_class_name(name: str) -> str: return "c_" + hashlib.md5(name.encode("utf-8")).hexdigest() diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index c88f79297..dfb5b265e 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -21,6 +21,8 @@ import time import ujson +LOG = logging.getLogger(__name__) + class BackupError(Exception): """Backup Error""" @@ -31,7 +33,6 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str] self.config = config self.backup_location = backup_path self.topic_name = topic_option or self.config["topic_name"] - self.log = logging.getLogger("SchemaBackup") self.consumer = None self.producer = None self.admin_client = None @@ -88,15 +89,15 @@ def init_admin_client(self): ) break except (NodeNotReadyError, NoBrokersAvailable, AssertionError): - self.log.warning("No Brokers available yet, retrying init_admin_client()") + LOG.warning("No Brokers available yet, retrying init_admin_client()") except: # pylint: disable=bare-except - self.log.exception("Failed to initialize admin client, retrying init_admin_client()") + LOG.exception("Failed to initialize admin client, retrying init_admin_client()") time.sleep(2.0) def _create_schema_topic_if_needed(self): if self.topic_name != self.config["topic_name"]: - self.log.info("Topic name overridden, not creating a topic with schema configuration") + LOG.info("Topic name overridden, not creating a topic with schema configuration") return self.init_admin_client() @@ -109,21 +110,21 @@ def _create_schema_topic_if_needed(self): schema_topic = KafkaSchemaReader.get_new_schema_topic(self.config) try: - self.log.info("Creating schema topic: %r", schema_topic) + LOG.info("Creating schema topic: %r", schema_topic) self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) - self.log.info("Topic: %r created successfully", self.config["topic_name"]) + LOG.info("Topic: %r created successfully", self.config["topic_name"]) break except TopicAlreadyExistsError: - self.log.info("Topic: %r already exists", self.config["topic_name"]) + LOG.info("Topic: %r already exists", self.config["topic_name"]) break except: # pylint: disable=bare-except - self.log.exception( + LOG.exception( "Failed to create topic: %r, retrying _create_schema_topic_if_needed()", self.config["topic_name"] ) time.sleep(5) def close(self): - self.log.info("Closing schema backup reader") + LOG.info("Closing schema backup reader") if self.consumer: self.consumer.close() self.consumer = None @@ -141,10 +142,10 @@ def request_backup(self): if self.backup_location: with open(self.backup_location, mode="w", encoding="utf8") as fp: fp.write(ser) - self.log.info("Schema backup written to %r", self.backup_location) + LOG.info("Schema backup written to %r", self.backup_location) else: print(ser) - self.log.info("Schema backup written to stdout") + LOG.info("Schema backup written to stdout") self.close() def restore_backup(self): @@ -155,7 +156,7 @@ def restore_backup(self): if not self.producer: self.init_producer() - self.log.info("Starting backup restore for topic: %r", self.topic_name) + LOG.info("Starting backup restore for topic: %r", self.topic_name) values = None with open(self.backup_location, mode="r", encoding="utf8") as fp: @@ -171,7 +172,7 @@ def restore_backup(self): future = self.producer.send(self.topic_name, key=key, value=value) self.producer.flush(timeout=self.timeout_ms) msg = future.get(self.timeout_ms) - self.log.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, msg.offset) + LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, msg.offset) self.close() def export_anonymized_avro_schemas(self): @@ -196,16 +197,16 @@ def export_anonymized_avro_schemas(self): if self.backup_location: with open(self.backup_location, mode="w", encoding="utf8") as fp: fp.write(ser) - self.log.info("Anonymized Avro schema export written to %r", self.backup_location) + LOG.info("Anonymized Avro schema export written to %r", self.backup_location) else: print(ser) - self.log.info("Anonymized Avro schema export written to stdout") + LOG.info("Anonymized Avro schema export written to stdout") self.close() def _export(self) -> List[Tuple[str, Dict[str, str]]]: if not self.consumer: self.init_consumer() - self.log.info("Starting schema backup read for topic: %r", self.topic_name) + LOG.info("Starting schema backup read for topic: %r", self.topic_name) values = [] topic_fully_consumed = False @@ -221,14 +222,14 @@ def _export(self) -> List[Tuple[str, Dict[str, str]]]: try: key = ujson.loads(key) except ValueError: - self.log.debug("Invalid JSON in message.key: %r, value: %r", message.key, message.value) + LOG.debug("Invalid JSON in message.key: %r, value: %r", message.key, message.value) value = None if message.value: value = message.value.decode("utf8") try: value = ujson.loads(value) except ValueError: - self.log.debug("Invalid JSON in message.value: %r, key: %r", message.value, message.key) + LOG.debug("Invalid JSON in message.value: %r, key: %r", message.value, message.key) values.append((key, value)) return values diff --git a/karapace/schema_models.py b/karapace/schema_models.py index e8e111440..d36274015 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -16,11 +16,8 @@ from typing import Any, Dict, Union import json -import logging import ujson -log = logging.getLogger(__name__) - def parse_avro_schema_definition(s: str) -> AvroSchema: """Compatibility function with Avro which ignores trailing data in JSON @@ -146,7 +143,6 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": ProtobufException, ProtobufSchemaParseException, ) as e: - log.exception("Unexpected error: %s \n schema:[%s]", e, schema_str) raise InvalidSchema from e else: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 335166560..dab8df057 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -20,7 +20,6 @@ import time import ujson -log = logging.getLogger(__name__) Offset = int Subject = str Version = int @@ -31,6 +30,7 @@ # The value `0` is a valid offset and it represents the first message produced # to a topic, therefore it can not be used. OFFSET_EMPTY = -1 +LOG = logging.getLogger(__name__) class OffsetsWatcher: @@ -83,7 +83,6 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.log = logging.getLogger("KafkaSchemaReader") self.timeout_ms = 200 self.config = config self.subjects: Dict[Subject, SubjectData] = {} @@ -154,10 +153,10 @@ def init_admin_client(self) -> bool: ) return True except (NodeNotReadyError, NoBrokersAvailable, AssertionError): - self.log.warning("No Brokers available yet, retrying init_admin_client()") + LOG.warning("No Brokers available yet, retrying init_admin_client()") time.sleep(2.0) except: # pylint: disable=bare-except - self.log.exception("Failed to initialize admin client, retrying init_admin_client()") + LOG.exception("Failed to initialize admin client, retrying init_admin_client()") time.sleep(2.0) return False @@ -175,17 +174,17 @@ def create_schema_topic(self) -> bool: schema_topic = self.get_new_schema_topic(self.config) try: - self.log.info("Creating topic: %r", schema_topic) + LOG.info("Creating topic: %r", schema_topic) self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) - self.log.info("Topic: %r created successfully", self.config["topic_name"]) + LOG.info("Topic: %r created successfully", self.config["topic_name"]) self.schema_topic = schema_topic return True except TopicAlreadyExistsError: - self.log.warning("Topic: %r already exists", self.config["topic_name"]) + LOG.warning("Topic: %r already exists", self.config["topic_name"]) self.schema_topic = schema_topic return True except: # pylint: disable=bare-except - self.log.exception("Failed to create topic: %r, retrying create_schema_topic()", self.config["topic_name"]) + LOG.exception("Failed to create topic: %r, retrying create_schema_topic()", self.config["topic_name"]) time.sleep(5) return False @@ -198,7 +197,7 @@ def get_schema_id(self, new_schema: TypedSchema) -> int: return self.global_schema_id def close(self) -> None: - self.log.info("Closing schema_reader") + LOG.info("Closing schema_reader") self.running = False def run(self) -> None: @@ -213,10 +212,11 @@ def run(self) -> None: if not self.consumer: self.init_consumer() self.handle_messages() + LOG.info("Status: offset: %r, ready: %r", self.offset, self.ready) except Exception as e: # pylint: disable=broad-except if self.stats: self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - self.log.exception("Unexpected exception in schema reader loop") + LOG.exception("Unexpected exception in schema reader loop") try: if self.admin_client: self.admin_client.close() @@ -225,7 +225,7 @@ def run(self) -> None: except Exception as e: # pylint: disable=broad-except if self.stats: self.stats.unexpected_exception(ex=e, where="schema_reader_exit") - self.log.exception("Unexpected exception closing schema reader") + LOG.exception("Unexpected exception closing schema reader") def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" @@ -248,7 +248,7 @@ def handle_messages(self) -> None: try: key = ujson.loads(msg.key.decode("utf8")) except ValueError: - self.log.exception("Invalid JSON in msg.key: %r, value: %r", msg.key, msg.value) + LOG.exception("Invalid JSON in msg.key") continue value = None @@ -256,18 +256,11 @@ def handle_messages(self) -> None: try: value = ujson.loads(msg.value.decode("utf8")) except ValueError: - self.log.exception("Invalid JSON in msg.value: %r, key: %r", msg.value, msg.key) + LOG.exception("Invalid JSON in msg.value") continue - self.log.info("Read new record: key: %r, value: %r, offset: %r", key, value, msg.offset) self.handle_msg(key, value) self.offset = msg.offset - self.log.info( - "Handled message, current offset: %r, ready: %r, add_offsets: %r", - self.offset, - self.ready, - add_offsets, - ) if self.ready and add_offsets: self.offset_watcher.offset_seen(self.offset) @@ -275,29 +268,29 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: subject = key.get("subject") if subject is not None: if subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", subject) + LOG.info("Adding first version of subject: %r with no schemas", subject) self.subjects[subject] = {"schemas": {}} if not value: - self.log.info("Deleting compatibility config completely for subject: %r", subject) + LOG.info("Deleting compatibility config completely for subject: %r", subject) self.subjects[subject].pop("compatibility", None) else: - self.log.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) + LOG.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) self.subjects[subject]["compatibility"] = value["compatibilityLevel"] elif value is not None: - self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) + LOG.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) self.config["compatibility"] = value["compatibilityLevel"] def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: # pylint: disable=unused-argument if value is None: - self.log.error("DELETE_SUBJECT record doesnt have a value, should have") + LOG.error("DELETE_SUBJECT record doesnt have a value, should have") return subject = value["subject"] if subject not in self.subjects: - self.log.error("Subject: %r did not exist, should have", subject) + LOG.error("Subject: %r did not exist, should have", subject) else: - self.log.info("Deleting subject: %r, value: %r", subject, value) + LOG.info("Deleting subject: %r, value: %r", subject, value) updated_schemas = { key: self._delete_schema_below_version(schema, value["version"]) for key, schema in self.subjects[subject]["schemas"].items() @@ -308,11 +301,11 @@ def _handle_msg_schema_hard_delete(self, key: dict) -> None: subject, version = key["subject"], key["version"] if subject not in self.subjects: - self.log.error("Hard delete: Subject %s did not exist, should have", subject) + LOG.error("Hard delete: Subject %s did not exist, should have", subject) elif version not in self.subjects[subject]["schemas"]: - self.log.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject) + LOG.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject) else: - self.log.info("Hard delete: subject: %r version: %r", subject, version) + LOG.info("Hard delete: subject: %r version: %r", subject, version) self.subjects[subject]["schemas"].pop(version, None) def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: @@ -330,7 +323,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: try: schema_type_parsed = SchemaType(schema_type) except ValueError: - self.log.error("Invalid schema type: %s", schema_type) + LOG.error("Invalid schema type: %s", schema_type) return # Protobuf doesn't use JSON @@ -338,29 +331,33 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: try: ujson.loads(schema_str) except ValueError: - self.log.error("Invalid json: %s", value["schema"]) + LOG.error("Schema is not invalid JSON") return - typed_schema = TypedSchema(schema_type=schema_type_parsed, schema_str=schema_str) - self.log.debug("Got typed schema %r", typed_schema) - if schema_subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", schema_subject) + LOG.info("Adding first version of subject: %r with no schemas", schema_subject) self.subjects[schema_subject] = {"schemas": {}} subjects_schemas = self.subjects[schema_subject]["schemas"] - if schema_version in subjects_schemas: - self.log.info("Updating entry for subject: %r, value: %r", schema_subject, value) - else: - self.log.info("Adding new version of subject: %r, value: %r", schema_subject, value) - - subjects_schemas[schema_version] = { + typed_schema = TypedSchema( + schema_type=schema_type_parsed, + schema_str=schema_str, + ) + schema = { "schema": typed_schema, "version": schema_version, "id": schema_id, "deleted": schema_deleted, } + + if schema_version in subjects_schemas: + LOG.info("Updating entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) + else: + LOG.info("Adding entry subject: %r version: %r id: %r", schema_subject, schema_version, schema_id) + + subjects_schemas[schema_version] = schema + with self.id_lock: self.schemas[schema_id] = typed_schema self.global_schema_id = max(self.global_schema_id, schema_id) diff --git a/karapace/serialization.py b/karapace/serialization.py index 4e95f5b5c..e983d35c1 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -13,12 +13,9 @@ import avro import avro.schema import io -import logging import struct import ujson -log = logging.getLogger(__name__) - START_BYTE = 0x0 HEADER_FORMAT = ">bI" HEADER_SIZE = 5 diff --git a/karapace/statsd.py b/karapace/statsd.py index 65b85b17d..8255593ee 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -19,13 +19,13 @@ STATSD_HOST = "127.0.0.1" STATSD_PORT = 8125 +LOG = logging.getLogger(__name__) class StatsClient: def __init__(self, host: str = STATSD_HOST, port: int = STATSD_PORT, sentry_config: Dict = None) -> None: self.sentry_config: Dict - self.log = logging.getLogger("StatsClient") if sentry_config is None: self.sentry_config = { "dsn": os.environ.get("SENTRY_DSN"), @@ -73,7 +73,7 @@ def update_sentry_config(self, config: Dict) -> None: self.raven_client = raven.Client(**self.sentry_config) except ImportError: self.raven_client = None - self.log.warning("Cannot enable Sentry.io sending: importing 'raven' failed") + LOG.warning("Cannot enable Sentry.io sending: importing 'raven' failed") else: self.raven_client = None @@ -125,8 +125,8 @@ def _send(self, metric: str, metric_type: bytes, value: Any, tags: Optional[Dict parts.insert(1, ",{}={}".format(tag, tag_value).encode("utf-8")) self._socket.sendto(b"".join(parts), self._dest_addr) - except Exception as ex: # pylint: disable=broad-except - self.log.error("Unexpected exception in statsd send: %s: %s", ex.__class__.__name__, ex) + except Exception: # pylint: disable=broad-except + LOG.exception("Unexpected exception in statsd send") def close(self) -> None: self._socket.close() diff --git a/karapace/utils.py b/karapace/utils.py index 1ed24d50f..c0cd17449 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -20,8 +20,8 @@ import time import ujson -log = logging.getLogger("KarapaceUtils") NS_BLACKOUT_DURATION_SECONDS = 120 +LOG = logging.getLogger(__name__) def _isoformat(datetime_obj: datetime) -> str: @@ -151,7 +151,7 @@ def close_invalid_connections(self): conns = self._conns.copy().values() for conn in conns: if conn and conn.ns_blackout(): - log.info( + LOG.info( "Node id %s no longer in cluster metadata, closing connection and requesting update", conn.node_id ) self.close(conn.node_id) @@ -164,7 +164,7 @@ def _poll(self, timeout): try: self.close_invalid_connections() except Exception as e: # pylint: disable=broad-except - log.error("Error closing invalid connections: %r", e) + LOG.error("Error closing invalid connections: %r", e) def _maybe_refresh_metadata(self, wakeup=False): """ @@ -185,7 +185,7 @@ def _maybe_refresh_metadata(self, wakeup=False): else: node_id = bootstrap_nodes[0].nodeId if node_id is None: - log.debug("Give up sending metadata request since no node is available") + LOG.debug("Give up sending metadata request since no node is available") return self.config["reconnect_backoff_ms"] if self._can_send_request(node_id): @@ -197,7 +197,7 @@ def _maybe_refresh_metadata(self, wakeup=False): topics = [] if self.config["api_version"] < (0, 10) else None api_version = 0 if self.config["api_version"] < (0, 10) else 1 request = MetadataRequest[api_version](topics) - log.debug("Sending metadata request %s to node %s", request, node_id) + LOG.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) future.add_errback(self.cluster.failed_update) @@ -217,7 +217,7 @@ def refresh_done(val_or_error): return self.config["reconnect_backoff_ms"] if self.maybe_connect(node_id, wakeup=wakeup): - log.debug("Initializing connection to node %s for metadata request", node_id) + LOG.debug("Initializing connection to node %s for metadata request", node_id) return self.config["reconnect_backoff_ms"] return float("inf")