Skip to content

Commit

Permalink
Merge pull request #818 from Aiven-Open/jjaakola-aiven-add-get-mode-e…
Browse files Browse the repository at this point in the history
…ndpoints

feat: GET /mode and GET /mode/<subject:path> endpoints
  • Loading branch information
tvainika authored Feb 16, 2024
2 parents 227fd55 + e8e409a commit 3a3cfbf
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 2 deletions.
8 changes: 7 additions & 1 deletion karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version
from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject, Version
from typing import Mapping, Sequence

import asyncio
Expand Down Expand Up @@ -439,6 +439,12 @@ def get_subject_versions_for_schema(
subject_versions = sorted(subject_versions, key=lambda s: (s["subject"], s["version"]))
return subject_versions

def get_global_mode(self) -> Mode:
return Mode.readwrite

def get_subject_mode(self) -> Mode:
return Mode.readwrite

def send_schema_message(
self,
*,
Expand Down
58 changes: 57 additions & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.utils import JSONDecodeError
from typing import Any

Expand Down Expand Up @@ -302,6 +302,24 @@ def _add_schema_registry_routes(self) -> None:
json_body=False,
auth=self._auth,
)
self.route(
"/mode",
callback=self.get_global_mode,
method="GET",
schema_request=True,
with_request=False,
json_body=False,
auth=self._auth,
)
self.route(
"/mode/<subject:path>",
callback=self.get_subject_mode,
method="GET",
schema_request=True,
with_request=False,
json_body=False,
auth=self._auth,
)

async def close(self) -> None:
self.log.info("Closing karapace_schema_registry_controller")
Expand Down Expand Up @@ -1245,6 +1263,44 @@ async def subject_post(
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def get_global_mode(
self,
content_type: str,
*,
user: User | None = None,
) -> None:
self._check_authorization(user, Operation.Read, "Config:")
self.r(
body={"mode": str(self.schema_registry.get_global_mode())},
content_type=content_type,
status=HTTPStatus.OK,
)

async def get_subject_mode(
self,
content_type: str,
*,
subject: str,
user: User | None = None,
) -> None:
self._check_authorization(user, Operation.Read, f"Subject:{subject}")

if self.schema_registry.database.find_subject(subject=Subject(subject)) is None:
self.r(
body={
"error_code": SchemaErrorCodes.SUBJECT_NOT_FOUND.value,
"message": SchemaErrorMessages.SUBJECT_NOT_FOUND_FMT.value.format(subject=subject),
},
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
)

self.r(
body={"mode": str(self.schema_registry.get_global_mode())},
content_type=content_type,
status=HTTPStatus.OK,
)

def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
schema_id = self.schema_registry.database.get_schema_id_if_exists(
subject=subject, schema=schema, include_deleted=include_deleted
Expand Down
5 changes: 5 additions & 0 deletions karapace/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ class SubjectType(StrEnum, Enum):
value = "value"
# partition it's a function of `str` and StrEnum its inherits from it.
partition_ = "partition"


@unique
class Mode(StrEnum):
readwrite = "READWRITE"
6 changes: 6 additions & 0 deletions tests/integration/test_schema_registry_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None:
res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}")
assert res.status_code == 401

res = await registry_async_client_auth.get("mode")
assert res.status_code == 401

res = await registry_async_client_auth.get(f"mode/{quote(subject)}")
assert res.status_code == 401


async def test_sr_list_subjects(registry_async_client_auth: Client) -> None:
cavesubject = new_random_name("cave-")
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/test_schema_registry_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from karapace.client import Client
from karapace.typing import Mode
from tests.utils import create_schema_name_factory, create_subject_name_factory

import json
import pytest


@pytest.mark.parametrize("trail", ["", "/"])
async def test_global_mode(registry_async_client: Client, trail: str) -> None:
res = await registry_async_client.get(f"/mode{trail}")
assert res.status_code == 200
json_res = res.json()
assert json_res == {"mode": str(Mode.readwrite)}


@pytest.mark.parametrize("trail", ["", "/"])
async def test_subject_mode(registry_async_client: Client, trail: str) -> None:
subject_name_factory = create_subject_name_factory(f"test_schema_same_subject_{trail}")
schema_name = create_schema_name_factory(f"test_schema_same_subject_{trail}")()

schema_str = json.dumps(
{
"type": "record",
"name": schema_name,
"fields": [
{
"name": "f",
"type": "string",
}
],
}
)
subject = subject_name_factory()
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schema": schema_str},
)
assert res.status_code == 200

res = await registry_async_client.get(f"/mode/{subject}{trail}")
assert res.status_code == 200
json_res = res.json()
assert json_res == {"mode": str(Mode.readwrite)}

res = await registry_async_client.get(f"/mode/unknown_subject{trail}")
assert res.status_code == 404
json_res = res.json()
assert json_res == {"error_code": 40401, "message": "Subject 'unknown_subject' not found."}

0 comments on commit 3a3cfbf

Please sign in to comment.