diff --git a/kpops/component_handlers/kafka_connect/model.py b/kpops/component_handlers/kafka_connect/model.py index 840df06c3..b86b1d359 100644 --- a/kpops/component_handlers/kafka_connect/model.py +++ b/kpops/component_handlers/kafka_connect/model.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Literal +from typing import Any import pydantic from pydantic import ( @@ -12,10 +12,8 @@ from pydantic.json_schema import SkipJsonSchema from typing_extensions import override -from kpops.components.base_components.helm_app import HelmAppValues -from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr +from kpops.components.common.topic import KafkaTopic, KafkaTopicStr from kpops.utils.pydantic import ( - CamelCaseConfigModel, DescConfigModel, by_alias, exclude_by_value, @@ -111,15 +109,3 @@ class KafkaConnectConfigErrorResponse(BaseModel): name: str error_count: int configs: list[KafkaConnectConfigDescription] - - -class KafkaConnectorResetterConfig(CamelCaseConfigModel): - brokers: str - connector: str - delete_consumer_group: bool | None = None - offset_topic: str | None = None - - -class KafkaConnectorResetterValues(HelmAppValues): - connector_type: Literal["source", "sink"] - config: KafkaConnectorResetterConfig diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index f79c552fc..fbf5824eb 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -17,7 +17,7 @@ parse_and_compare_topic_configs, parse_rest_proxy_topic_config, ) -from kpops.components.base_components.models.topic import KafkaTopic +from kpops.components.common.topic import KafkaTopic from kpops.utils.colorify import greenify, magentaify from kpops.utils.dict_differ import Diff, DiffType, render_diff diff --git a/kpops/components/base_components/__init__.py b/kpops/components/base_components/__init__.py index ff94dde1f..39dfabd0c 100644 --- a/kpops/components/base_components/__init__.py +++ b/kpops/components/base_components/__init__.py @@ -1,12 +1,12 @@ -from kpops.components.base_components.helm_app import HelmApp -from kpops.components.base_components.kafka_app import KafkaApp -from kpops.components.base_components.kafka_connector import ( +from .helm_app import HelmApp +from .kafka_app import KafkaApp +from .kafka_connector import ( KafkaConnector, KafkaSinkConnector, KafkaSourceConnector, ) -from kpops.components.base_components.kubernetes_app import KubernetesApp -from kpops.components.base_components.pipeline_component import PipelineComponent +from .kubernetes_app import KubernetesApp +from .pipeline_component import PipelineComponent __all__ = ( "HelmApp", diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 93a54f93f..fd9d30d1d 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -12,9 +12,9 @@ from kpops.component_handlers import get_handlers from kpops.components.base_components.cleaner import Cleaner from kpops.components.base_components.helm_app import HelmAppValues -from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.common.topic import KafkaTopic, KafkaTopicStr from kpops.config import get_config from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 66fb93d62..6a13c6bef 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -3,7 +3,7 @@ import logging from abc import ABC from functools import cached_property -from typing import Any, NoReturn +from typing import Any, Literal, NoReturn import pydantic from pydantic import Field, PrivateAttr, ValidationInfo, computed_field, field_validator @@ -15,18 +15,17 @@ ) from kpops.component_handlers.kafka_connect.model import ( KafkaConnectorConfig, - KafkaConnectorResetterConfig, - KafkaConnectorResetterValues, KafkaConnectorType, ) from kpops.components.base_components.cleaner import Cleaner from kpops.components.base_components.helm_app import HelmAppValues from kpops.components.base_components.models.from_section import FromTopic -from kpops.components.base_components.models.topic import KafkaTopic from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.common.topic import KafkaTopic from kpops.config import get_config from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr +from kpops.utils.pydantic import CamelCaseConfigModel try: from typing import Self # pyright: ignore[reportAttributeAccessIssue] @@ -36,6 +35,18 @@ log = logging.getLogger("KafkaConnector") +class KafkaConnectorResetterConfig(CamelCaseConfigModel): + brokers: str + connector: str + delete_consumer_group: bool | None = None + offset_topic: str | None = None + + +class KafkaConnectorResetterValues(HelmAppValues): + connector_type: Literal["source", "sink"] + config: KafkaConnectorResetterConfig + + class KafkaConnectorResetter(Cleaner, ABC): """Helm app for resetting and cleaning a Kafka Connector. diff --git a/kpops/components/base_components/models/to_section.py b/kpops/components/base_components/models/to_section.py index 4e7e5b463..9ffdb199a 100644 --- a/kpops/components/base_components/models/to_section.py +++ b/kpops/components/base_components/models/to_section.py @@ -3,7 +3,7 @@ from pydantic import ConfigDict, Field from kpops.components.base_components.models import ModelName, ModelVersion, TopicName -from kpops.components.base_components.models.topic import KafkaTopic, TopicConfig +from kpops.components.common.topic import KafkaTopic, TopicConfig from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import DescConfigModel diff --git a/kpops/components/base_components/models/topic.py b/kpops/components/base_components/models/topic.py deleted file mode 100644 index c4d511071..000000000 --- a/kpops/components/base_components/models/topic.py +++ /dev/null @@ -1,122 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterable -from enum import Enum -from typing import Annotated, Any - -import pydantic -from pydantic import BaseModel, ConfigDict, Field, model_validator - -from kpops.utils.docstring import describe_attr -from kpops.utils.pydantic import DescConfigModel - - -class OutputTopicTypes(str, Enum): - """Types of output topic. - - - OUTPUT: output topic - - ERROR: error topic - """ - - OUTPUT = "output" - ERROR = "error" - - -class TopicConfig(DescConfigModel): - """Configure an output topic. - - :param type: Topic type - :param key_schema: Key schema class name - :param value_schema: Value schema class name - :param partitions_count: Number of partitions into which the topic is divided - :param replication_factor: Replication factor of the topic - :param configs: Topic configs - :param role: Custom identifier belonging to one or multiple topics, provide only if `type` is `extra` - """ - - type: OutputTopicTypes | None = Field( - default=None, title="Topic type", description=describe_attr("type", __doc__) - ) - key_schema: str | None = Field( - default=None, - title="Key schema", - description=describe_attr("key_schema", __doc__), - ) - value_schema: str | None = Field( - default=None, - title="Value schema", - description=describe_attr("value_schema", __doc__), - ) - partitions_count: int | None = Field( - default=None, - title="Partitions count", - description=describe_attr("partitions_count", __doc__), - ) - replication_factor: int | None = Field( - default=None, - title="Replication factor", - description=describe_attr("replication_factor", __doc__), - ) - configs: dict[str, str | int] = Field( - default={}, description=describe_attr("configs", __doc__) - ) - role: str | None = Field(default=None, description=describe_attr("role", __doc__)) - - model_config = ConfigDict( - extra="forbid", - use_enum_values=True, - populate_by_name=True, - ) - - @model_validator(mode="after") - def extra_topic_role(self) -> Any: - """Ensure that `cls.role` is used correctly, assign type if needed.""" - if self.type and self.role: - msg = "Define `role` only if `type` is undefined" - raise ValueError(msg) - return self - - -class KafkaTopic(BaseModel): - """Internal representation of a Kafka topic. - - Users configure topic name and config through the `ToSection` of a `PipelineComponent` - - :param name: Topic name - :param config: Topic config - """ - - name: str - config: TopicConfig = TopicConfig() - - @property - def id(self) -> str: - """Unique identifier of this topic.""" - return f"topic-{self.name}" - - @staticmethod - def deduplicate(topics: Iterable[KafkaTopic]) -> list[KafkaTopic]: - """Deduplicate an iterable of Kafka topics based on their name, overwriting previous ones.""" - return list({topic.name: topic for topic in topics}.values()) - - -def deserialize_kafka_topic_from_str(topic: Any) -> KafkaTopic | dict: - if topic and isinstance(topic, str): - return KafkaTopic(name=topic) - if isinstance(topic, KafkaTopic | dict): - return topic - msg = f"Topic should be a valid {KafkaTopic.__name__} instance or topic name string" - raise ValueError(msg) - - -def serialize_kafka_topic_to_str(topic: KafkaTopic) -> str: - return topic.name - - -# Pydantic type for KafkaTopic that serializes to str, used for generating the correct JSON schema -KafkaTopicStr = Annotated[ - KafkaTopic, - pydantic.WithJsonSchema({"type": "string"}), - pydantic.BeforeValidator(deserialize_kafka_topic_from_str), - pydantic.PlainSerializer(serialize_kafka_topic_to_str), -] diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 7ce114899..a9349b323 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -17,7 +17,7 @@ from kpops.components.base_components.models.to_section import ( ToSection, ) -from kpops.components.base_components.models.topic import ( +from kpops.components.common.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, diff --git a/kpops/components/streams_bootstrap/__init__.py b/kpops/components/streams_bootstrap/__init__.py index b4eb34b2f..19341d74b 100644 --- a/kpops/components/streams_bootstrap/__init__.py +++ b/kpops/components/streams_bootstrap/__init__.py @@ -1,6 +1,7 @@ from kpops.components.common.streams_bootstrap import StreamsBootstrap -from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp -from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp + +from .producer.producer_app import ProducerApp +from .streams.streams_app import StreamsApp __all__ = ( "StreamsBootstrap", diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 7aed327f7..c2a193839 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -7,12 +7,12 @@ KafkaApp, KafkaAppCleaner, ) -from kpops.components.base_components.models.topic import ( +from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.common.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, ) -from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.producer.model import ProducerAppValues from kpops.utils.docstring import describe_attr diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 675978396..87390f533 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -10,8 +10,8 @@ KafkaAppValues, KafkaStreamsConfig, ) -from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr from kpops.components.common.streams_bootstrap import StreamsBootstrapValues +from kpops.components.common.topic import KafkaTopic, KafkaTopicStr from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 976280373..1b7453c72 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -7,8 +7,8 @@ from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler from kpops.components.base_components.helm_app import HelmApp from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner -from kpops.components.base_components.models.topic import KafkaTopic from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.common.topic import KafkaTopic from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import ( StreamsAppValues, diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index 716f2f482..fb23e225c 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -15,7 +15,7 @@ from kpops.components.base_components.models.to_section import ( ToSection, ) -from kpops.components.base_components.models.topic import OutputTopicTypes, TopicConfig +from kpops.components.common.topic import OutputTopicTypes, TopicConfig from kpops.config import KpopsConfig, SchemaRegistryConfig from kpops.utils.colorify import greenify, magentaify, yellowify from tests.pipeline.test_components import TestSchemaProvider diff --git a/tests/component_handlers/topic/test_topic_handler.py b/tests/component_handlers/topic/test_topic_handler.py index 5109652af..cbe26f61d 100644 --- a/tests/component_handlers/topic/test_topic_handler.py +++ b/tests/component_handlers/topic/test_topic_handler.py @@ -19,7 +19,7 @@ TopicResponse, TopicSpec, ) -from kpops.components.base_components.models.topic import ( +from kpops.components.common.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index e92a01721..d912237c1 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -25,7 +25,7 @@ from kpops.components.base_components.models.to_section import ( ToSection, ) -from kpops.components.base_components.models.topic import ( +from kpops.components.common.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 0b87cb57a..aafb83431 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -25,7 +25,7 @@ from kpops.components.base_components.models.to_section import ( ToSection, ) -from kpops.components.base_components.models.topic import OutputTopicTypes, TopicConfig +from kpops.components.common.topic import OutputTopicTypes, TopicConfig from kpops.utils.environment import ENV from tests.components.test_kafka_connector import ( CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index f3378e2ed..6dd8a87b4 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -7,7 +7,7 @@ from kpops.component_handlers import ComponentHandlers, get_handlers from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components.base_components.models.topic import ( +from kpops.components.common.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 4f6f6cc70..9ca2832e6 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -16,7 +16,7 @@ from kpops.components.base_components.models.to_section import ( ToSection, ) -from kpops.components.base_components.models.topic import ( +from kpops.components.common.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, diff --git a/tests/components/test_topic.py b/tests/components/test_topic.py index 9193a0d00..78e16c3b9 100644 --- a/tests/components/test_topic.py +++ b/tests/components/test_topic.py @@ -3,7 +3,7 @@ import pydantic import pytest -from kpops.components.base_components.models.topic import ( +from kpops.components.common.topic import ( KafkaTopic, KafkaTopicStr, OutputTopicTypes, diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 97438a978..8557c6b2e 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -10,8 +10,8 @@ from kpops.components.base_components.models.to_section import ( ToSection, ) -from kpops.components.base_components.models.topic import OutputTopicTypes, TopicConfig from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.common.topic import OutputTopicTypes, TopicConfig from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index 7d079572c..5bb2e875a 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -2,8 +2,8 @@ from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig from kpops.components.base_components.kafka_connector import KafkaSinkConnector -from kpops.components.base_components.models.topic import OutputTopicTypes from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.common.topic import OutputTopicTypes from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp