From 9f30957c2bbd70edc234b9a0531e471ee91ec87f Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Thu, 9 Jan 2025 16:42:02 +0200 Subject: [PATCH] fix: no forwarding if primary url own --- src/schema_registry/controller.py | 2 +- src/schema_registry/routers/config.py | 11 ++++++++--- src/schema_registry/routers/subjects.py | 8 ++++++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/schema_registry/controller.py b/src/schema_registry/controller.py index d8924d9a7..0dc2c8380 100644 --- a/src/schema_registry/controller.py +++ b/src/schema_registry/controller.py @@ -863,7 +863,7 @@ async def subject_post( except Exception as xx: raise xx - elif not primary_url: + elif not primary_url or self.config.host in primary_url or self.config.host in primary_url: raise no_primary_url_error() else: return await forward_client.forward_request_remote( diff --git a/src/schema_registry/routers/config.py b/src/schema_registry/routers/config.py index 3d1884af6..c2e476714 100644 --- a/src/schema_registry/routers/config.py +++ b/src/schema_registry/routers/config.py @@ -6,6 +6,8 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.forward_client import ForwardClient from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer @@ -46,6 +48,7 @@ async def config_put( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"): raise unauthorized() @@ -53,7 +56,7 @@ async def config_put( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.config_set(compatibility_level_request=compatibility_level_request) - if not primary_url: + if not primary_url or config.host in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( request=request, primary_url=primary_url, response_type=CompatibilityResponse @@ -86,6 +89,7 @@ async def config_set_subject( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -93,7 +97,7 @@ async def config_set_subject( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.config_subject_set(subject=subject, compatibility_level_request=compatibility_level_request) - if not primary_url: + if not primary_url or config.host in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( request=request, primary_url=primary_url, response_type=CompatibilityResponse @@ -110,6 +114,7 @@ async def config_delete_subject( forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -117,7 +122,7 @@ async def config_delete_subject( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.config_subject_delete(subject=subject) - if not primary_url: + if not primary_url or config.host in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote( request=request, primary_url=primary_url, response_type=CompatibilityResponse diff --git a/src/schema_registry/routers/subjects.py b/src/schema_registry/routers/subjects.py index cd5352490..dc2680a0c 100644 --- a/src/schema_registry/routers/subjects.py +++ b/src/schema_registry/routers/subjects.py @@ -6,6 +6,8 @@ from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Depends, Request from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.forward_client import ForwardClient from karapace.typing import Subject from schema_registry.container import SchemaRegistryContainer @@ -76,6 +78,7 @@ async def subjects_subject_delete( authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -83,7 +86,7 @@ async def subjects_subject_delete( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.subject_delete(subject=subject, permanent=permanent) - if not primary_url: + if not primary_url or config.host in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=list[int]) @@ -157,6 +160,7 @@ async def subjects_subject_version_delete( authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), + config: Config = Depends(Provide[KarapaceContainer.config]), ) -> int: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -164,7 +168,7 @@ async def subjects_subject_version_delete( i_am_primary, primary_url = await schema_registry.get_master() if i_am_primary: return await controller.subject_version_delete(subject=subject, version=version, permanent=permanent) - if not primary_url: + if not primary_url or config.host in primary_url: raise no_primary_url_error() return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=int)