Skip to content

Commit

Permalink
Implement conversion for Kafka Topic configs
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted committed Dec 3, 2024
1 parent 4ba1751 commit 53fcd51
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
13 changes: 2 additions & 11 deletions kpops/component_handlers/kafka_connect/model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from enum import Enum
from typing import Any

Expand All @@ -18,6 +17,7 @@
by_alias,
exclude_by_value,
to_dot,
to_str,
)


Expand Down Expand Up @@ -84,12 +84,6 @@ def serialize_topics(self, topics: list[KafkaTopic]) -> str | None:
return None
return ",".join(topic.name for topic in topics)

@staticmethod
def serialize_to_str(value: Any) -> str:
if isinstance(value, str):
return value
return json.dumps(value)

# TODO(Ivan Yordanov): Currently hacky and potentially unsafe. Find cleaner solution
@model_serializer(mode="wrap", when_used="always")
def serialize_model(
Expand All @@ -98,10 +92,7 @@ def serialize_model(
info: pydantic.SerializationInfo,
) -> dict[str, str]:
result = exclude_by_value(default_serialize_handler(self), None)
return {
by_alias(self, name): self.serialize_to_str(value)
for name, value in result.items()
}
return {by_alias(self, name): to_str(value) for name, value in result.items()}


class ConnectorTask(BaseModel):
Expand Down
6 changes: 5 additions & 1 deletion kpops/components/common/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import BaseModel, ConfigDict, Field, model_validator

from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import DescConfigModel
from kpops.utils.pydantic import DescConfigModel, to_str


class OutputTopicTypes(str, Enum):
Expand Down Expand Up @@ -76,6 +76,10 @@ def extra_topic_label(self) -> Any:
raise ValueError(msg)
return self

@pydantic.field_serializer("configs")
def serialize_configs(self, configs: dict[str, str | int]) -> dict[str, str]:
return {key: to_str(value) for key, value in configs.items()}


class KafkaTopic(BaseModel):
"""Internal representation of a Kafka topic.
Expand Down
7 changes: 7 additions & 0 deletions kpops/utils/pydantic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -42,6 +43,12 @@ def by_alias(model: BaseModel, field_name: str) -> str:
return model.model_fields.get(field_name, Field()).alias or field_name


def to_str(value: Any) -> str:
if isinstance(value, str):
return value
return json.dumps(value)


_V = TypeVar("_V")


Expand Down
22 changes: 22 additions & 0 deletions tests/component_handlers/topic/test_topic_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,28 @@ async def get_default_topic_response_mock(self) -> MagicMock:
wrapper.get_broker_config.return_value = BrokerConfigResponse(**broker_response)
return wrapper

def test_convert_config_values_to_str(self):
assert TopicConfig(
partitions_count=1,
configs={
"retention.ms": -1,
"cleanup.policy": "delete",
"delete.retention.ms": 123456789,
},
).model_dump() == {
"configs": {
"retention.ms": "-1",
"cleanup.policy": "delete",
"delete.retention.ms": "123456789",
},
"key_schema": None,
"label": None,
"partitions_count": 1,
"replication_factor": None,
"type": None,
"value_schema": None,
}

@pytest.mark.asyncio()
async def test_should_call_create_topic_with_dry_run_false(self):
wrapper = AsyncMock()
Expand Down

0 comments on commit 53fcd51

Please sign in to comment.