Skip to content

Commit

Permalink
fix: no forwarding if primary url own
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jan 9, 2025
1 parent 8cd588e commit 9f30957
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/schema_registry/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ async def subject_post(
except Exception as xx:
raise xx

elif not primary_url:
elif not primary_url or self.config.host in primary_url or self.config.host in primary_url:
raise no_primary_url_error()
else:
return await forward_client.forward_request_remote(
Expand Down
11 changes: 8 additions & 3 deletions src/schema_registry/routers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
Expand Down Expand Up @@ -46,14 +48,15 @@ async def config_put(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.config_set(compatibility_level_request=compatibility_level_request)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
Expand Down Expand Up @@ -86,14 +89,15 @@ async def config_set_subject(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.config_subject_set(subject=subject, compatibility_level_request=compatibility_level_request)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
Expand All @@ -110,14 +114,15 @@ async def config_delete_subject(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.config_subject_delete(subject=subject)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
Expand Down
8 changes: 6 additions & 2 deletions src/schema_registry/routers/subjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
Expand Down Expand Up @@ -76,14 +78,15 @@ async def subjects_subject_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> list[int]:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.subject_delete(subject=subject, permanent=permanent)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=list[int])

Expand Down Expand Up @@ -157,14 +160,15 @@ async def subjects_subject_version_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> int:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.subject_version_delete(subject=subject, version=version, permanent=permanent)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=int)

Expand Down

0 comments on commit 9f30957

Please sign in to comment.