Skip to content

Commit

Permalink
fix: no forwarding if primary url own
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jan 10, 2025
1 parent 2d53305 commit 63e722e
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 156 deletions.
3 changes: 3 additions & 0 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ def get_advertised_port(self) -> int:
def get_advertised_hostname(self) -> str:
return self.advertised_hostname or self.host

def get_address(self) -> str:
return f"{self.host}:{self.port}"

def get_rest_base_uri(self) -> str:
return (
self.rest_base_uri
Expand Down
14 changes: 9 additions & 5 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from karapace.typing import PrimaryInfo, SchemaReaderStoppper
from schema_registry.telemetry.tracer import Tracer
from threading import Thread
from typing import Final
Expand Down Expand Up @@ -163,18 +163,22 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
group_generation_id=generation if generation is not None else -1,
)

def get_master_info(self) -> tuple[bool | None, str | None]:
def get_master_info(self) -> PrimaryInfo:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
if not self._sc:
return False, None
return PrimaryInfo(False, None)

if not self._sc.ready():
# we should wait for a while after we have been elected master, we should also consume
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
# for more details
return False, None
return PrimaryInfo(False, None)

return self._sc.are_we_master(), self._sc.master_url
url: str | None = None
if self._sc.master_url is not None and f"{self.config.host}:{self.config.port}" not in self._sc.master_url:
url = self._sc.master_url

return PrimaryInfo(self._sc.are_we_master(), url)

def __send_close_event(self) -> None:
self._closing.set()
Expand Down
4 changes: 2 additions & 2 deletions src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def __init__(
def is_master_assigned_to_myself(self) -> bool:
return self._are_we_master or False

def are_we_master(self) -> bool | None:
def are_we_master(self) -> bool:
"""
After a new election its made we should wait for a while since the previous master could have produced
a new message shortly before being disconnected from the cluster.
Expand All @@ -212,7 +212,7 @@ def are_we_master(self) -> bool | None:
# `self._are_we_master` is `None` only during the perform of the assignment
# where we don't know if we are master yet
LOG.warning("No new elections performed yet.")
return None
return False

if not self._ready or not self._schema_reader_stopper.ready():
return False
Expand Down
7 changes: 7 additions & 0 deletions src/karapace/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from abc import ABC, abstractmethod
from collections.abc import Callable, Generator, Mapping, Sequence
from dataclasses import dataclass
from enum import Enum, unique
from karapace.errors import InvalidVersion
from pydantic import ValidationInfo
Expand Down Expand Up @@ -128,3 +129,9 @@ def ready(self) -> bool:
@abstractmethod
def set_not_ready(self) -> None:
pass


@dataclass(frozen=True)
class PrimaryInfo:
primary: bool
primary_url: str | None
8 changes: 4 additions & 4 deletions src/schema_registry/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,8 @@ async def subject_post(
if schema_id is not None:
return SchemaIdResponse(id=schema_id)

i_am_primary, primary_url = await self.schema_registry.get_master()
if i_am_primary:
primary_info = await self.schema_registry.get_master()
if primary_info.primary:
try:
schema_id = await self.schema_registry.write_new_schema_local(Subject(subject), new_schema, references)
return SchemaIdResponse(id=schema_id)
Expand Down Expand Up @@ -863,11 +863,11 @@ async def subject_post(
except Exception as xx:
raise xx

elif not primary_url:
if not primary_info.primary_url:
raise no_primary_url_error()
else:
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=SchemaIdResponse
request=request, primary_url=primary_info.primary_url, response_type=SchemaIdResponse
)

async def get_global_mode(self) -> ModeResponse:
Expand Down
4 changes: 2 additions & 2 deletions src/schema_registry/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ def handle_messages(self) -> None:

watch_offsets = False
if self.master_coordinator is not None:
are_we_master, _ = self.master_coordinator.get_master_info()
primary_info = self.master_coordinator.get_master_info()
# keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct
# writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any
# messages off the topic are writes performed by another node
# Also if master_eligibility is disabled by configuration, disable writes too
if are_we_master is True:
if primary_info.primary:
watch_offsets = True

self.consume_messages(msgs, watch_offsets)
Expand Down
12 changes: 6 additions & 6 deletions src/schema_registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version
from karapace.typing import JsonObject, Mode, PrimaryInfo, SchemaId, Subject, Version
from schema_registry.messaging import KarapaceProducer
from schema_registry.reader import KafkaSchemaReader
from schema_registry.telemetry.tracer import Tracer
Expand Down Expand Up @@ -89,7 +89,7 @@ async def close(self) -> None:
stack.enter_context(closing(self.schema_reader))
stack.enter_context(closing(self.producer))

async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | None]:
async def get_master(self, ignore_readiness: bool = False) -> PrimaryInfo:
"""Resolve if current node is the primary and the primary node address.
:param bool ignore_readiness: Ignore waiting to become ready and return
Expand All @@ -98,13 +98,13 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
"""
async with self._master_lock:
while True:
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
primary_info = self.mc.get_master_info()
if not primary_info.primary and not primary_info.primary_url:
LOG.info("No master set: %r", primary_info)
elif not ignore_readiness and self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
return are_we_master, master_url
return primary_info
await asyncio.sleep(1.0)

def get_compatibility_mode(self, subject: Subject) -> CompatibilityModes:
Expand Down
24 changes: 12 additions & 12 deletions src/schema_registry/routers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ async def config_put(
if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
primary_info = await schema_registry.get_master()
if primary_info.primary:
return await controller.config_set(compatibility_level_request=compatibility_level_request)
if not primary_url:
if not primary_info.primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
request=request, primary_url=primary_info.primary_url, response_type=CompatibilityResponse
)


Expand Down Expand Up @@ -90,13 +90,13 @@ async def config_set_subject(
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
primary_info = await schema_registry.get_master()
if primary_info.primary:
return await controller.config_subject_set(subject=subject, compatibility_level_request=compatibility_level_request)
if not primary_url:
if not primary_info.primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
request=request, primary_url=primary_info.primary_url, response_type=CompatibilityResponse
)


Expand All @@ -114,11 +114,11 @@ async def config_delete_subject(
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
primary_info = await schema_registry.get_master()
if primary_info.primary:
return await controller.config_subject_delete(subject=subject)
if not primary_url:
if not primary_info.primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
request=request, primary_url=primary_info.primary_url, response_type=CompatibilityResponse
)
13 changes: 8 additions & 5 deletions src/schema_registry/routers/master_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,23 @@ async def master_availability(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
) -> MasterAvailabilityResponse:
are_we_master, master_url = await schema_registry.get_master()
LOG.info("are master %s, master url %s", are_we_master, master_url)
primary_info = await schema_registry.get_master()
LOG.info("are master %r,", primary_info)
response.headers.update(NO_CACHE_HEADER)

if (
schema_registry.schema_reader.master_coordinator is not None
and schema_registry.schema_reader.master_coordinator._sc is not None
and schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself()
):
return MasterAvailabilityResponse(master_available=are_we_master)
return MasterAvailabilityResponse(master_available=primary_info.primary)

if master_url is None or f"{config.advertised_hostname}:{config.advertised_port}" in master_url:
if (
primary_info.primary_url is None
or f"{config.advertised_hostname}:{config.advertised_port}" in primary_info.primary_url
):
return NO_MASTER

return await forward_client.forward_request_remote(
request=request, primary_url=master_url, response_type=MasterAvailabilityResponse
request=request, primary_url=primary_info.primary_url, response_type=MasterAvailabilityResponse
)
23 changes: 15 additions & 8 deletions src/schema_registry/routers/subjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
Expand Down Expand Up @@ -76,16 +78,19 @@ async def subjects_subject_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> list[int]:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
primary_info = await schema_registry.get_master()
if primary_info.primary:
return await controller.subject_delete(subject=subject, permanent=permanent)
if not primary_url:
if not primary_info.primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=list[int])
return await forward_client.forward_request_remote(
request=request, primary_url=primary_info.primary_url, response_type=list[int]
)


@subjects_router.post("/{subject}/versions")
Expand Down Expand Up @@ -161,12 +166,14 @@ async def subjects_subject_version_delete(
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
primary_info = await schema_registry.get_master()
if primary_info.primary:
return await controller.subject_version_delete(subject=subject, version=version, permanent=permanent)
if not primary_url:
if not primary_info.primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=int)
return await forward_client.forward_request_remote(
request=request, primary_url=primary_info.primary_url, response_type=int
)


@subjects_router.get("/{subject}/versions/{version}/schema")
Expand Down
13 changes: 10 additions & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async def fixture_rest_async(
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
# Use non-default max request size for REST producer.
config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES
config.waiting_time_before_acting_as_master_ms = 300
config.waiting_time_before_acting_as_master_ms = 500
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
Expand Down Expand Up @@ -352,7 +352,7 @@ async def fixture_rest_async_novalidation(
# Use non-default max request size for REST producer.
config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES
config.name_strategy_validation = False # This should be only difference from rest_async
config.waiting_time_before_acting_as_master_ms = 300
config.waiting_time_before_acting_as_master_ms = 500
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
Expand Down Expand Up @@ -422,7 +422,7 @@ async def fixture_rest_async_registry_auth(
config.registry_port = registry.port
config.registry_user = "admin"
config.registry_password = "admin"
config.waiting_time_before_acting_as_master_ms = 300
config.waiting_time_before_acting_as_master_ms = 500
rest = KafkaRest(config=config)

try:
Expand Down Expand Up @@ -477,8 +477,10 @@ async def fixture_registry_async_pair(

config1 = Config()
config1.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config1.waiting_time_before_acting_as_master_ms = 500
config2 = Config()
config2.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config2.waiting_time_before_acting_as_master_ms = 500

async with start_schema_registry_cluster(
config_templates=[config1, config2],
Expand Down Expand Up @@ -507,6 +509,7 @@ async def fixture_registry_cluster(
return
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.waiting_time_before_acting_as_master_ms = 500

user_config = request.param.get("config", {}) if hasattr(request, "param") else {}
config.__dict__.update(user_config)
Expand Down Expand Up @@ -593,6 +596,7 @@ async def fixture_registry_https_endpoint(

config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.waiting_time_before_acting_as_master_ms = 500
config.server_tls_certfile = server_cert
config.server_tls_keyfile = server_key

Expand Down Expand Up @@ -650,6 +654,7 @@ async def fixture_registry_http_auth_endpoint(

config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.waiting_time_before_acting_as_master_ms = 500
config.registry_authfile = "tests/integration/config/karapace.auth.json"

async with start_schema_registry_cluster(
Expand Down Expand Up @@ -703,10 +708,12 @@ async def fixture_registry_async_auth_pair(

config1 = Config()
config1.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config1.waiting_time_before_acting_as_master_ms = 500
config1.registry_authfile = "tests/integration/config/karapace.auth.json"

config2 = Config()
config2.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config2.waiting_time_before_acting_as_master_ms = 500
config2.registry_authfile = "tests/integration/config/karapace.auth.json"

async with start_schema_registry_cluster(
Expand Down
Loading

0 comments on commit 63e722e

Please sign in to comment.