From 63e722e2166aeb5020b4944c2e0353ece5235b2c Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Thu, 9 Jan 2025 16:42:02 +0200 Subject: [PATCH] fix: no forwarding if primary url own --- src/karapace/config.py | 3 + .../coordinator/master_coordinator.py | 14 +- .../coordinator/schema_coordinator.py | 4 +- src/karapace/typing.py | 7 + src/schema_registry/controller.py | 8 +- src/schema_registry/reader.py | 4 +- src/schema_registry/registry.py | 12 +- src/schema_registry/routers/config.py | 24 +-- .../routers/master_availability.py | 13 +- src/schema_registry/routers/subjects.py | 23 ++- tests/integration/conftest.py | 13 +- tests/integration/test_master_coordinator.py | 103 ------------ tests/integration/test_request_forwarding.py | 146 ++++++++++++++++++ tests/integration/test_schema_protobuf.py | 4 +- tests/integration/test_schema_reader.py | 7 +- tests/integration/utils/cluster.py | 2 + tests/unit/schema_registry/test_forwarding.py | 79 ++++++++++ tests/utils.py | 3 +- 18 files changed, 313 insertions(+), 156 deletions(-) create mode 100644 tests/integration/test_request_forwarding.py create mode 100644 tests/unit/schema_registry/test_forwarding.py diff --git a/src/karapace/config.py b/src/karapace/config.py index 6663f6bd7..38f4bea40 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -140,6 +140,9 @@ def get_advertised_port(self) -> int: def get_advertised_hostname(self) -> str: return self.advertised_hostname or self.host + def get_address(self) -> str: + return f"{self.host}:{self.port}" + def get_rest_base_uri(self) -> str: return ( self.rest_base_uri diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 2ac449b06..47c80c0ba 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -14,7 +14,7 @@ from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS -from karapace.typing import SchemaReaderStoppper +from karapace.typing import PrimaryInfo, SchemaReaderStoppper from schema_registry.telemetry.tracer import Tracer from threading import Thread from typing import Final @@ -163,18 +163,22 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus: group_generation_id=generation if generation is not None else -1, ) - def get_master_info(self) -> tuple[bool | None, str | None]: + def get_master_info(self) -> PrimaryInfo: """Return whether we're the master, and the actual master url that can be used if we're not""" if not self._sc: - return False, None + return PrimaryInfo(False, None) if not self._sc.ready(): # we should wait for a while after we have been elected master, we should also consume # all the messages in the log before proceeding, check the doc of `self._sc.are_we_master` # for more details - return False, None + return PrimaryInfo(False, None) - return self._sc.are_we_master(), self._sc.master_url + url: str | None = None + if self._sc.master_url is not None and f"{self.config.host}:{self.config.port}" not in self._sc.master_url: + url = self._sc.master_url + + return PrimaryInfo(self._sc.are_we_master(), url) def __send_close_event(self) -> None: self._closing.set() diff --git a/src/karapace/coordinator/schema_coordinator.py b/src/karapace/coordinator/schema_coordinator.py index e699c0c82..56286537b 100644 --- a/src/karapace/coordinator/schema_coordinator.py +++ b/src/karapace/coordinator/schema_coordinator.py @@ -198,7 +198,7 @@ def __init__( def is_master_assigned_to_myself(self) -> bool: return self._are_we_master or False - def are_we_master(self) -> bool | None: + def are_we_master(self) -> bool: """ After a new election its made we should wait for a while since the previous master could have produced a new message shortly before being disconnected from the cluster. @@ -212,7 +212,7 @@ def are_we_master(self) -> bool | None: # `self._are_we_master` is `None` only during the perform of the assignment # where we don't know if we are master yet LOG.warning("No new elections performed yet.") - return None + return False if not self._ready or not self._schema_reader_stopper.ready(): return False diff --git a/src/karapace/typing.py b/src/karapace/typing.py index 9d30eaa34..ea6ba1bb1 100644 --- a/src/karapace/typing.py +++ b/src/karapace/typing.py @@ -7,6 +7,7 @@ from abc import ABC, abstractmethod from collections.abc import Callable, Generator, Mapping, Sequence +from dataclasses import dataclass from enum import Enum, unique from karapace.errors import InvalidVersion from pydantic import ValidationInfo @@ -128,3 +129,9 @@ def ready(self) -> bool: @abstractmethod def set_not_ready(self) -> None: pass + + +@dataclass(frozen=True) +class PrimaryInfo: + primary: bool + primary_url: str | None diff --git a/src/schema_registry/controller.py b/src/schema_registry/controller.py index d8924d9a7..d53efd710 100644 --- a/src/schema_registry/controller.py +++ b/src/schema_registry/controller.py @@ -831,8 +831,8 @@ async def subject_post( if schema_id is not None: return SchemaIdResponse(id=schema_id) - i_am_primary, primary_url = await self.schema_registry.get_master() - if i_am_primary: + primary_info = await self.schema_registry.get_master() + if primary_info.primary: try: schema_id = await self.schema_registry.write_new_schema_local(Subject(subject), new_schema, references) return SchemaIdResponse(id=schema_id) @@ -863,11 +863,11 @@ async def subject_post( except Exception as xx: raise xx - elif not primary_url: + if not primary_info.primary_url: raise no_primary_url_error() else: return await forward_client.forward_request_remote( - request=request, primary_url=primary_url, response_type=SchemaIdResponse + request=request, primary_url=primary_info.primary_url, response_type=SchemaIdResponse ) async def get_global_mode(self) -> ModeResponse: diff --git a/src/schema_registry/reader.py b/src/schema_registry/reader.py index dfe5a547c..3bf1bef97 100644 --- a/src/schema_registry/reader.py +++ b/src/schema_registry/reader.py @@ -405,12 +405,12 @@ def handle_messages(self) -> None: watch_offsets = False if self.master_coordinator is not None: - are_we_master, _ = self.master_coordinator.get_master_info() + primary_info = self.master_coordinator.get_master_info() # keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct # writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any # messages off the topic are writes performed by another node # Also if master_eligibility is disabled by configuration, disable writes too - if are_we_master is True: + if primary_info.primary: watch_offsets = True self.consume_messages(msgs, watch_offsets) diff --git a/src/schema_registry/registry.py b/src/schema_registry/registry.py index 4d45f219d..9e42004a5 100644 --- a/src/schema_registry/registry.py +++ b/src/schema_registry/registry.py @@ -30,7 +30,7 @@ from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference -from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version +from karapace.typing import JsonObject, Mode, PrimaryInfo, SchemaId, Subject, Version from schema_registry.messaging import KarapaceProducer from schema_registry.reader import KafkaSchemaReader from schema_registry.telemetry.tracer import Tracer @@ -89,7 +89,7 @@ async def close(self) -> None: stack.enter_context(closing(self.schema_reader)) stack.enter_context(closing(self.producer)) - async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | None]: + async def get_master(self, ignore_readiness: bool = False) -> PrimaryInfo: """Resolve if current node is the primary and the primary node address. :param bool ignore_readiness: Ignore waiting to become ready and return @@ -98,13 +98,13 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | """ async with self._master_lock: while True: - are_we_master, master_url = self.mc.get_master_info() - if are_we_master is None: - LOG.info("No master set: %r, url: %r", are_we_master, master_url) + primary_info = self.mc.get_master_info() + if not primary_info.primary and not primary_info.primary_url: + LOG.info("No master set: %r", primary_info) elif not ignore_readiness and self.schema_reader.ready() is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: - return are_we_master, master_url + return primary_info await asyncio.sleep(1.0) def get_compatibility_mode(self, subject: Subject) -> CompatibilityModes: diff --git a/src/schema_registry/routers/config.py b/src/schema_registry/routers/config.py index 3d1884af6..9d99c42ee 100644 --- a/src/schema_registry/routers/config.py +++ b/src/schema_registry/routers/config.py @@ -50,13 +50,13 @@ async def config_put( if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"): raise unauthorized() - i_am_primary, primary_url = await schema_registry.get_master() - if i_am_primary: + primary_info = await schema_registry.get_master() + if primary_info.primary: return await controller.config_set(compatibility_level_request=compatibility_level_request) - if not primary_url: + if not primary_info.primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( - request=request, primary_url=primary_url, response_type=CompatibilityResponse + request=request, primary_url=primary_info.primary_url, response_type=CompatibilityResponse ) @@ -90,13 +90,13 @@ async def config_set_subject( if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() - i_am_primary, primary_url = await schema_registry.get_master() - if i_am_primary: + primary_info = await schema_registry.get_master() + if primary_info.primary: return await controller.config_subject_set(subject=subject, compatibility_level_request=compatibility_level_request) - if not primary_url: + if not primary_info.primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( - request=request, primary_url=primary_url, response_type=CompatibilityResponse + request=request, primary_url=primary_info.primary_url, response_type=CompatibilityResponse ) @@ -114,11 +114,11 @@ async def config_delete_subject( if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() - i_am_primary, primary_url = await schema_registry.get_master() - if i_am_primary: + primary_info = await schema_registry.get_master() + if primary_info.primary: return await controller.config_subject_delete(subject=subject) - if not primary_url: + if not primary_info.primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( - request=request, primary_url=primary_url, response_type=CompatibilityResponse + request=request, primary_url=primary_info.primary_url, response_type=CompatibilityResponse ) diff --git a/src/schema_registry/routers/master_availability.py b/src/schema_registry/routers/master_availability.py index 60ac6e30d..4421db3e4 100644 --- a/src/schema_registry/routers/master_availability.py +++ b/src/schema_registry/routers/master_availability.py @@ -40,8 +40,8 @@ async def master_availability( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), ) -> MasterAvailabilityResponse: - are_we_master, master_url = await schema_registry.get_master() - LOG.info("are master %s, master url %s", are_we_master, master_url) + primary_info = await schema_registry.get_master() + LOG.info("are master %r,", primary_info) response.headers.update(NO_CACHE_HEADER) if ( @@ -49,11 +49,14 @@ async def master_availability( and schema_registry.schema_reader.master_coordinator._sc is not None and schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() ): - return MasterAvailabilityResponse(master_available=are_we_master) + return MasterAvailabilityResponse(master_available=primary_info.primary) - if master_url is None or f"{config.advertised_hostname}:{config.advertised_port}" in master_url: + if ( + primary_info.primary_url is None + or f"{config.advertised_hostname}:{config.advertised_port}" in primary_info.primary_url + ): return NO_MASTER return await forward_client.forward_request_remote( - request=request, primary_url=master_url, response_type=MasterAvailabilityResponse + request=request, primary_url=primary_info.primary_url, response_type=MasterAvailabilityResponse ) diff --git a/src/schema_registry/routers/subjects.py b/src/schema_registry/routers/subjects.py index cd5352490..80cf79fa2 100644 --- a/src/schema_registry/routers/subjects.py +++ b/src/schema_registry/routers/subjects.py @@ -6,6 +6,8 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.forward_client import ForwardClient from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer @@ -76,16 +78,19 @@ async def subjects_subject_delete( authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() - i_am_primary, primary_url = await schema_registry.get_master() - if i_am_primary: + primary_info = await schema_registry.get_master() + if primary_info.primary: return await controller.subject_delete(subject=subject, permanent=permanent) - if not primary_url: + if not primary_info.primary_url: raise no_primary_url_error() - return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=list[int]) + return await forward_client.forward_request_remote( + request=request, primary_url=primary_info.primary_url, response_type=list[int] + ) @subjects_router.post("/{subject}/versions") @@ -161,12 +166,14 @@ async def subjects_subject_version_delete( if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() - i_am_primary, primary_url = await schema_registry.get_master() - if i_am_primary: + primary_info = await schema_registry.get_master() + if primary_info.primary: return await controller.subject_version_delete(subject=subject, version=version, permanent=permanent) - if not primary_url: + if not primary_info.primary_url: raise no_primary_url_error() - return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=int) + return await forward_client.forward_request_remote( + request=request, primary_url=primary_info.primary_url, response_type=int + ) @subjects_router.get("/{subject}/versions/{version}/schema") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 10386e832..0a2f87212 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -284,7 +284,7 @@ async def fixture_rest_async( config.bootstrap_uri = kafka_servers.bootstrap_servers[0] # Use non-default max request size for REST producer. config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES - config.waiting_time_before_acting_as_master_ms = 300 + config.waiting_time_before_acting_as_master_ms = 500 rest = KafkaRest(config=config) assert rest.serializer.registry_client @@ -352,7 +352,7 @@ async def fixture_rest_async_novalidation( # Use non-default max request size for REST producer. config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES config.name_strategy_validation = False # This should be only difference from rest_async - config.waiting_time_before_acting_as_master_ms = 300 + config.waiting_time_before_acting_as_master_ms = 500 rest = KafkaRest(config=config) assert rest.serializer.registry_client @@ -422,7 +422,7 @@ async def fixture_rest_async_registry_auth( config.registry_port = registry.port config.registry_user = "admin" config.registry_password = "admin" - config.waiting_time_before_acting_as_master_ms = 300 + config.waiting_time_before_acting_as_master_ms = 500 rest = KafkaRest(config=config) try: @@ -477,8 +477,10 @@ async def fixture_registry_async_pair( config1 = Config() config1.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config1.waiting_time_before_acting_as_master_ms = 500 config2 = Config() config2.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config2.waiting_time_before_acting_as_master_ms = 500 async with start_schema_registry_cluster( config_templates=[config1, config2], @@ -507,6 +509,7 @@ async def fixture_registry_cluster( return config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.waiting_time_before_acting_as_master_ms = 500 user_config = request.param.get("config", {}) if hasattr(request, "param") else {} config.__dict__.update(user_config) @@ -593,6 +596,7 @@ async def fixture_registry_https_endpoint( config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.waiting_time_before_acting_as_master_ms = 500 config.server_tls_certfile = server_cert config.server_tls_keyfile = server_key @@ -650,6 +654,7 @@ async def fixture_registry_http_auth_endpoint( config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config.waiting_time_before_acting_as_master_ms = 500 config.registry_authfile = "tests/integration/config/karapace.auth.json" async with start_schema_registry_cluster( @@ -703,10 +708,12 @@ async def fixture_registry_async_auth_pair( config1 = Config() config1.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config1.waiting_time_before_acting_as_master_ms = 500 config1.registry_authfile = "tests/integration/config/karapace.auth.json" config2 = Config() config2.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config2.waiting_time_before_acting_as_master_ms = 500 config2.registry_authfile = "tests/integration/config/karapace.auth.json" async with start_schema_registry_cluster( diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index f3049be4c..99f69bf2f 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -10,11 +10,9 @@ from karapace.typing import SchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import allocate_port -from tests.integration.utils.rest_client import RetryRestClient from tests.utils import new_random_name import asyncio -import json import pytest @@ -189,104 +187,3 @@ async def test_no_eligible_master(kafka_servers: KafkaServers) -> None: assert mc.schema_coordinator.master_url is None finally: await mc.close() - - -async def test_schema_request_forwarding( - registry_async_pair, - registry_async_retry_client: RetryRestClient, -) -> None: - master_url, slave_url = registry_async_pair - max_tries, counter = 5, 0 - wait_time = 0.5 - subject = new_random_name("subject") - schema = {"type": "string"} - other_schema = {"type": "int"} - # Config updates - for subj_path in [None, subject]: - if subj_path: - path = f"config/{subject}" - else: - path = "config" - for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: - resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat}) - assert resp.ok - while True: - assert counter < max_tries, "Compat update not propagated" - resp = await registry_async_retry_client.get(f"{master_url}/{path}") - if not resp.ok: - print(f"Invalid http status code: {resp.status_code}") - continue - data = resp.json() - if "compatibilityLevel" not in data: - print(f"Invalid response: {data}") - counter += 1 - await asyncio.sleep(wait_time) - continue - if data["compatibilityLevel"] != compat: - print(f"Bad compatibility: {data}") - counter += 1 - await asyncio.sleep(wait_time) - continue - break - - # New schema updates, last compatibility is None - for s in [schema, other_schema]: - resp = await registry_async_retry_client.post( - f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)} - ) - assert resp.ok - data = resp.json() - assert "id" in data, data - counter = 0 - while True: - assert counter < max_tries, "Subject schema data not propagated yet" - resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions") - if not resp.ok: - print(f"Invalid http status code: {resp.status_code}") - counter += 1 - continue - data = resp.json() - if not data: - print(f"No versions registered for subject {subject} yet") - counter += 1 - continue - assert len(data) == 2, data - assert data[0] == 1, data - print("Subject schema data propagated") - break - - # Schema deletions - resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1") - assert resp.ok - counter = 0 - while True: - assert counter < max_tries, "Subject version deletion not propagated yet" - resp = await registry_async_retry_client.get( - f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404 - ) - if resp.ok: - print(f"Subject {subject} still has version 1 on master") - counter += 1 - continue - assert resp.status_code == 404 - print(f"Subject {subject} no longer has version 1") - break - - # Subject deletion - resp = await registry_async_retry_client.get(f"{master_url}/subjects/") - assert resp.ok - data = resp.json() - assert subject in data - resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}") - assert resp.ok - counter = 0 - while True: - assert counter < max_tries, "Subject deletion not propagated yet" - resp = await registry_async_retry_client.get(f"{master_url}/subjects/") - if not resp.ok: - print("Could not retrieve subject list on master") - counter += 1 - continue - data = resp.json() - assert subject not in data - break diff --git a/tests/integration/test_request_forwarding.py b/tests/integration/test_request_forwarding.py new file mode 100644 index 000000000..9cd69988d --- /dev/null +++ b/tests/integration/test_request_forwarding.py @@ -0,0 +1,146 @@ +""" +karapace - test request forwarding + +Copyright (c) 2025 Aiven Ltd +See LICENSE for details +""" + +from _pytest.fixtures import SubRequest +from typing import AsyncGenerator +from karapace.client import Client +from tests.integration.utils.rest_client import RetryRestClient +from tests.utils import new_random_name, repeat_until_master_is_available, repeat_until_successful_request + +import asyncio +import json +import pytest + + +@pytest.fixture(scope="function", name="request_forwarding_retry_client") +async def fixture_registry_async_client( + request: SubRequest, + registry_async_pair: list[str], +) -> AsyncGenerator[RetryRestClient, None]: + registry_async_pair[0] + client = Client( + server_uri=registry_async_pair[0], + server_ca=request.config.getoption("server_ca"), + ) + + try: + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "subjects", + json_data=None, + headers=None, + error_msg=f"Registry API {client.server_uri} is unreachable", + timeout=10, + sleep=0.3, + ) + await repeat_until_master_is_available(client) + yield RetryRestClient(client) + finally: + await client.close() + + +async def test_schema_request_forwarding( + registry_async_pair: list[str], + request_forwarding_retry_client: RetryRestClient, +) -> None: + master_url, slave_url = registry_async_pair + + max_tries, counter = 5, 0 + wait_time = 0.5 + subject = new_random_name("subject") + schema = {"type": "string"} + other_schema = {"type": "int"} + # Config updates + for subj_path in [None, subject]: + if subj_path: + path = f"config/{subject}" + else: + path = "config" + for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: + resp = await request_forwarding_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat}) + assert resp.ok + while True: + assert counter < max_tries, "Compat update not propagated" + resp = await request_forwarding_retry_client.get(f"{master_url}/{path}") + if not resp.ok: + print(f"Invalid http status code: {resp.status_code}") + continue + data = resp.json() + if "compatibilityLevel" not in data: + print(f"Invalid response: {data}") + counter += 1 + await asyncio.sleep(wait_time) + continue + if data["compatibilityLevel"] != compat: + print(f"Bad compatibility: {data}") + counter += 1 + await asyncio.sleep(wait_time) + continue + break + + # New schema updates, last compatibility is None + for s in [schema, other_schema]: + resp = await request_forwarding_retry_client.post( + f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)} + ) + assert resp.ok + data = resp.json() + assert "id" in data, data + counter = 0 + while True: + assert counter < max_tries, "Subject schema data not propagated yet" + resp = await request_forwarding_retry_client.get(f"{master_url}/subjects/{subject}/versions") + if not resp.ok: + print(f"Invalid http status code: {resp.status_code}") + counter += 1 + continue + data = resp.json() + if not data: + print(f"No versions registered for subject {subject} yet") + counter += 1 + continue + assert len(data) == 2, data + assert data[0] == 1, data + print("Subject schema data propagated") + break + + # Schema deletions + resp = await request_forwarding_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1") + assert resp.ok + counter = 0 + while True: + assert counter < max_tries, "Subject version deletion not propagated yet" + resp = await request_forwarding_retry_client.get( + f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404 + ) + if resp.ok: + print(f"Subject {subject} still has version 1 on master") + counter += 1 + continue + assert resp.status_code == 404 + print(f"Subject {subject} no longer has version 1") + break + + # Subject deletion + resp = await request_forwarding_retry_client.get(f"{master_url}/subjects/") + assert resp.ok + data = resp.json() + assert subject in data + resp = await request_forwarding_retry_client.delete(f"{slave_url}/subjects/{subject}") + assert resp.ok + counter = 0 + while True: + assert counter < max_tries, "Subject deletion not propagated yet" + resp = await request_forwarding_retry_client.get(f"{master_url}/subjects/") + if not resp.ok: + print("Could not retrieve subject list on master") + counter += 1 + continue + data = resp.json() + assert subject not in data + break diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 227cefda1..2c2f9b17b 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1333,10 +1333,12 @@ async def test_registering_normalized_schema(session_logdir: Path, kafka_servers config1 = Config() config1.bootstrap_uri = kafka_servers.bootstrap_servers[0] + config1.waiting_time_before_acting_as_master_ms = 500 config2 = Config() config2.bootstrap_uri = kafka_servers.bootstrap_servers[0] config2.use_protobuf_formatter = True + config2.waiting_time_before_acting_as_master_ms = 500 async with start_schema_registry_cluster( config_templates=[config1, config2], @@ -1347,7 +1349,7 @@ async def test_registering_normalized_schema(session_logdir: Path, kafka_servers client1 = Client(server_uri=servers[0], server_ca=None) client2 = Client(server_uri=servers[1], server_ca=None) - await asyncio.sleep(10) + await asyncio.sleep(2) body = {"schemaType": "PROTOBUF", "schema": SCHEMA_WITH_OPTION_ORDERED} res = await client1.post(f"subjects/{subject}/versions?normalize=true", json=body) diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 7b14d3ebb..6c4a902e0 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -13,6 +13,7 @@ from karapace.kafka.producer import KafkaProducer from karapace.key_format import KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher +from karapace.typing import PrimaryInfo from karapace.utils import json_encode from schema_registry.reader import KafkaSchemaReader from tests.base_testcase import BaseTestCase @@ -39,9 +40,9 @@ async def _wait_until_reader_is_ready_and_master( await asyncio.sleep(0.1) # Won master election - are_we_master = False - while not are_we_master: - are_we_master, _ = master_coordinator.get_master_info() + primary_info = PrimaryInfo(primary=False, primary_url=None) + while not primary_info.primary: + primary_info = master_coordinator.get_master_info() await asyncio.sleep(0.1) diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index f2c7de2ff..df5bafb59 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -79,6 +79,8 @@ async def start_schema_registry_cluster( "KARAPACE_SERVER_TLS_CERTFILE": config.server_tls_certfile if config.server_tls_certfile else "", "KARAPACE_SERVER_TLS_KEYFILE": config.server_tls_keyfile if config.server_tls_keyfile else "", "KARAPACE_USE_PROTOBUF_FORMATTER": "true" if config.use_protobuf_formatter else "false", + "KARAPACE_WAITING_TIME_BEFORE_ACTING_AS_MASTER_MS": str(config.waiting_time_before_acting_as_master_ms), + "KARAPACE_MASTER_ELIGIBILITY": str(config.master_eligibility), } process = popen_karapace_all(module="schema_registry", env=env, stdout=logfile, stderr=errfile) stack.callback(stop_process, process) diff --git a/tests/unit/schema_registry/test_forwarding.py b/tests/unit/schema_registry/test_forwarding.py new file mode 100644 index 000000000..abcba57a3 --- /dev/null +++ b/tests/unit/schema_registry/test_forwarding.py @@ -0,0 +1,79 @@ +""" +karapace - test request forwarding in router + +Copyright (c) 2025 Aiven Ltd +See LICENSE for details +""" + +from fastapi import Request +from fastapi.exceptions import HTTPException +from karapace.forward_client import ForwardClient +from karapace.typing import PrimaryInfo +from schema_registry.controller import KarapaceSchemaRegistryController +from schema_registry.registry import KarapaceSchemaRegistry +from schema_registry.routers.config import config_put +from unittest.mock import AsyncMock, Mock + +from schema_registry.routers.requests import CompatibilityRequest + +import pytest + + +async def test_forwarding_not_a_primary_and_own_primary_url() -> None: + compatibility_request = CompatibilityRequest(compatibility="FORWARD") + forward_client_mock = AsyncMock(spec=ForwardClient) + schema_registry_mock = AsyncMock(spec=KarapaceSchemaRegistry) + schema_registry_mock.get_master.return_value = PrimaryInfo(primary=False, primary_url=None) + + with pytest.raises(HTTPException): + await config_put( + request=Mock(spec=Request), + compatibility_level_request=compatibility_request, + schema_registry=schema_registry_mock, + user=None, + forward_client=forward_client_mock, + authorizer=None, + controller=None, + ) + + forward_client_mock.forward_request_remote.assert_not_called() + + +async def test_forwarding_to_a_primary() -> None: + compatibility_request = CompatibilityRequest(compatibility="FORWARD") + forward_client_mock = AsyncMock(spec=ForwardClient) + schema_registry_mock = AsyncMock(spec=KarapaceSchemaRegistry) + schema_registry_mock.get_master.return_value = PrimaryInfo(primary=False, primary_url="http://127.0.0.1:8082") + + await config_put( + request=Mock(spec=Request), + compatibility_level_request=compatibility_request, + schema_registry=schema_registry_mock, + user=None, + forward_client=forward_client_mock, + authorizer=None, + controller=None, + ) + + forward_client_mock.forward_request_remote.assert_called_once() + + +async def test_no_forwarding_as_instance_is_primary() -> None: + compatibility_request = CompatibilityRequest(compatibility="FORWARD") + forward_client_mock = AsyncMock(spec=ForwardClient) + controller_mock = AsyncMock(spec=KarapaceSchemaRegistryController) + schema_registry_mock = AsyncMock(spec=KarapaceSchemaRegistry) + schema_registry_mock.get_master.return_value = PrimaryInfo(primary=True, primary_url=None) + + await config_put( + request=Mock(spec=Request), + compatibility_level_request=compatibility_request, + schema_registry=schema_registry_mock, + user=None, + forward_client=forward_client_mock, + authorizer=None, + controller=controller_mock, + ) + + controller_mock.config_set.assert_called_once() + forward_client_mock.forward_request_remote.assert_not_called() diff --git a/tests/utils.py b/tests/utils.py index 634b77e6f..f578914f9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,7 +21,6 @@ import os import ssl import sys -import time import uuid consumer_valid_payload = { @@ -328,7 +327,7 @@ async def repeat_until_master_is_available(client: Client) -> None: reply = res.json() if reply is not None and "master_available" in reply and reply["master_available"] is True: break - time.sleep(1) + await asyncio.sleep(1) def write_ini(file_path: Path, ini_data: dict) -> None: