From 2071c1b809f17e0f9b61538f3546d20f468f4429 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 10 Dec 2024 17:02:01 +0000 Subject: [PATCH] Manifest toSection with Strimzi KafkaTopic (#545) Closes #537 --- config.yaml | 3 + .../resources/variables/config_env_vars.env | 3 + .../resources/variables/config_env_vars.md | 1 + docs/docs/schema/config.json | 31 +++++++ kpops/components/streams_bootstrap/base.py | 13 +++ kpops/config/__init__.py | 35 ++++++++ kpops/manifests/strimzi/__init__.py | 0 kpops/manifests/strimzi/kafka_topic.py | 89 +++++++++++++++++++ .../config.yaml | 1 + tests/conftest.py | 2 +- tests/manifests/strimzi/__init__.py | 0 tests/manifests/strimzi/test_kafka_topic.py | 77 ++++++++++++++++ .../test_deploy_argo_mode/manifest.yaml | 73 +++++++++++++++ .../test_deploy_manifest_mode/manifest.yaml | 73 +++++++++++++++ .../test_python_api/manifest.yaml | 73 +++++++++++++++ .../test_streams_bootstrap/manifest.yaml | 73 +++++++++++++++ tests/test_kpops_config.py | 25 +++++- 17 files changed, 567 insertions(+), 5 deletions(-) create mode 100644 kpops/manifests/strimzi/__init__.py create mode 100644 kpops/manifests/strimzi/kafka_topic.py create mode 100644 tests/manifests/strimzi/__init__.py create mode 100644 tests/manifests/strimzi/test_kafka_topic.py diff --git a/config.yaml b/config.yaml index 359b51a21..aa32d7d3c 100644 --- a/config.yaml +++ b/config.yaml @@ -1,2 +1,5 @@ kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" pipeline_base_dir: tests/pipeline +strimzi_topic: + label: + strimzi.io/cluster: my-cluster diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index b64bac2e0..baaf4fdbf 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -61,3 +61,6 @@ KPOPS_RETAIN_CLEAN_JOBS=False # operation_mode # The operation mode of KPOps (managed, manifest, argo). KPOPS_OPERATION_MODE=managed +# strimzi_topic +# Configuration for Strimzi Kafka Topics. +KPOPS_STRIMZI_TOPIC # No default value, not required diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 6b79a743c..f7fcd5303 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -20,3 +20,4 @@ These variables take precedence over the settings in `config.yaml`. Variables ma |KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore | |KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion.|retain_clean_jobs | |KPOPS_OPERATION_MODE |managed |False |The operation mode of KPOps (managed, manifest, argo). |operation_mode | +|KPOPS_STRIMZI_TOPIC | |False |Configuration for Strimzi Kafka Topics. |strimzi_topic | diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index 02d04ea6e..bbf12c5ac 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -162,6 +162,25 @@ "title": "SchemaRegistryConfig", "type": "object" }, + "StrimziTopicConfig": { + "additionalProperties": false, + "description": "Configuration for Strimzi Kafka Topics.", + "properties": { + "label": { + "additionalProperties": { + "type": "string" + }, + "description": "The label to identify the KafkaTopic resources managed by the Topic Operator. This does not have to be the name of the Kafka cluster. It can be the label assigned to the KafkaTopic resource. If you deploy more than one Topic Operator, the labels must be unique for each. That is, the operators cannot manage the same resources.", + "title": "Label", + "type": "object" + } + }, + "required": [ + "label" + ], + "title": "StrimziTopicConfig", + "type": "object" + }, "TopicNameConfig": { "additionalProperties": false, "description": "Configure the topic name variables you can use in the pipeline definition.", @@ -283,6 +302,18 @@ }, "description": "Configuration for Schema Registry." }, + "strimzi_topic": { + "anyOf": [ + { + "$ref": "#/$defs/StrimziTopicConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for Strimzi Kafka Topics." + }, "topic_name_config": { "allOf": [ { diff --git a/kpops/components/streams_bootstrap/base.py b/kpops/components/streams_bootstrap/base.py index 47973624b..a4ab5780e 100644 --- a/kpops/components/streams_bootstrap/base.py +++ b/kpops/components/streams_bootstrap/base.py @@ -7,11 +7,14 @@ import pydantic from pydantic import Field +from typing_extensions import override from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig from kpops.components.base_components import KafkaApp from kpops.components.base_components.helm_app import HelmApp from kpops.components.streams_bootstrap.model import StreamsBootstrapValues +from kpops.manifests.kubernetes import KubernetesManifest +from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic from kpops.utils.docstring import describe_attr if TYPE_CHECKING: @@ -83,3 +86,13 @@ def warning_for_latest_image_tag(self) -> Self: f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag." ) return self + + @override + def manifest_deploy(self) -> tuple[KubernetesManifest, ...]: + resource = super().manifest_deploy() + if self.to: + resource = resource + tuple( + StrimziKafkaTopic.from_topic(topic) for topic in self.to.kafka_topics + ) + + return resource diff --git a/kpops/config/__init__.py b/kpops/config/__init__.py index 7398d5e7c..07ecce58c 100644 --- a/kpops/config/__init__.py +++ b/kpops/config/__init__.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import ClassVar +import pydantic from pydantic import AnyHttpUrl, Field, PrivateAttr, TypeAdapter from pydantic_settings import ( BaseSettings, @@ -12,6 +13,7 @@ ) from typing_extensions import override +from kpops.api.exception import ValidationError from kpops.api.operation import OperationMode from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig from kpops.utils.docstring import describe_object @@ -19,6 +21,35 @@ ENV_PREFIX = "KPOPS_" +log = logging.getLogger("KPOpsConfig") + + +class StrimziTopicConfig(BaseSettings): + """Configuration for Strimzi Kafka Topics.""" + + label_: dict[str, str] = Field( + alias="label", + description="The label to identify the KafkaTopic resources managed by the Topic Operator. This does not have to be the name of the Kafka cluster. It can be the label assigned to the KafkaTopic resource. If you deploy more than one Topic Operator, the labels must be unique for each. That is, the operators cannot manage the same resources.", + ) + + @property + def cluster_labels(self) -> tuple[str, str]: + """Return the defined strimzi_topic.label as a tuple.""" + return next(iter(self.label_.items())) + + @pydantic.field_validator("label_", mode="after") + @classmethod + def label_validator(cls, label: dict[str, str]) -> dict[str, str]: + if len(label) == 0: + msg = "'strimzi_topic.label' must contain a single key-value pair." + raise ValidationError(msg) + if len(label) > 1: + log.warning( + "'resource_label' only reads the first entry in the dictionary. Other defined labels will be ignored." + ) + + return label + class TopicNameConfig(BaseSettings): """Configure the topic name variables you can use in the pipeline definition.""" @@ -125,6 +156,10 @@ class KpopsConfig(BaseSettings): default=OperationMode.MANAGED, description="The operation mode of KPOps (managed, manifest, argo).", ) + strimzi_topic: StrimziTopicConfig | None = Field( + default=None, + description=describe_object(StrimziTopicConfig.__doc__), + ) model_config = SettingsConfigDict( env_prefix=ENV_PREFIX, diff --git a/kpops/manifests/strimzi/__init__.py b/kpops/manifests/strimzi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kpops/manifests/strimzi/kafka_topic.py b/kpops/manifests/strimzi/kafka_topic.py new file mode 100644 index 000000000..4b9a4b667 --- /dev/null +++ b/kpops/manifests/strimzi/kafka_topic.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from pydantic import ConfigDict, Field, model_validator + +from kpops.api.exception import ValidationError +from kpops.components.common.topic import KafkaTopic +from kpops.config import get_config +from kpops.manifests.kubernetes import KubernetesManifest, ObjectMeta +from kpops.utils.docstring import describe_attr +from kpops.utils.pydantic import CamelCaseConfigModel + +if TYPE_CHECKING: + try: + from typing import Self # pyright: ignore[reportAttributeAccessIssue] + except ImportError: + from typing_extensions import Self + + +class TopicSpec(CamelCaseConfigModel): + """Specification of a Kafka topic. + + :param partitions: The number of partitions the topic should have. This cannot be decreased after topic creation. It can be increased after topic creation, but it is important to understand the consequences that has, especially for topics with semantic partitioning. When absent this will default to the broker configuration for `num.partitions`. + :param replicas: The number of replicas the topic should have. When absent this will default to the broker configuration for `default.replication.factor`. + :param config: The topic configuration. Topic config reference: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html + + """ + + partitions: int = Field( + default=1, ge=1, description=describe_attr("partitions", __doc__) + ) + replicas: int = Field( + default=1, ge=1, le=32767, description=describe_attr("replicas", __doc__) + ) + config: dict[str, Any] | None = Field( + default=None, description=describe_attr("config", __doc__) + ) + + model_config = ConfigDict(extra="allow") + + @model_validator(mode="before") + @classmethod + def set_defaults_if_none(cls, values: Any) -> Any: + if values.get("partitions") is None: + values["partitions"] = 1 + if values.get("replicas") is None: + values["replicas"] = 1 + return values + + +class StrimziKafkaTopic(KubernetesManifest): + """Represents a Strimzi Kafka Topic CRD. + + CRD definition: https://github.com/strimzi/strimzi-kafka-operator/blob/main/install/cluster-operator/043-Crd-kafkatopic.yaml + example: https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/topic/kafka-topic.yaml + """ + + api_version: str = "kafka.strimzi.io/v1beta2" + kind: str = "KafkaTopic" + metadata: ObjectMeta + spec: TopicSpec + status: dict[str, Any] | None = None + + @classmethod + def from_topic(cls, topic: KafkaTopic) -> Self: + strimzi_topic = get_config().strimzi_topic + if not strimzi_topic: + msg = "When manifesting KafkaTopic you must define 'strimzi_topic.resource_label' in the config.yaml" + raise ValidationError(msg) + cluster_domain, cluster_name = strimzi_topic.cluster_labels + + metadata = ObjectMeta.model_validate( + { + "name": topic.name, + "labels": {cluster_domain: cluster_name}, + } + ) + spec = TopicSpec.model_validate( + { + "partitions": topic.config.partitions_count, + "replicas": topic.config.replication_factor, + "config": topic.config.configs, + } + ) + return cls( + metadata=metadata, + spec=spec, + ) diff --git a/tests/cli/snapshots/test_init/test_init_project_include_optional/config.yaml b/tests/cli/snapshots/test_init/test_init_project_include_optional/config.yaml index 19b7f5d95..438094f12 100644 --- a/tests/cli/snapshots/test_init/test_init_project_include_optional/config.yaml +++ b/tests/cli/snapshots/test_init/test_init_project_include_optional/config.yaml @@ -24,6 +24,7 @@ schema_registry: enabled: false timeout: 30 url: http://localhost:8081/ +strimzi_topic: null topic_name_config: default_error_topic_name: ${pipeline.name}-${component.name}-error default_output_topic_name: ${pipeline.name}-${component.name} diff --git a/tests/conftest.py b/tests/conftest.py index 4983aa325..777c87416 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,8 +57,8 @@ def custom_components() -> Iterator[None]: def clear_kpops_config() -> Iterator[None]: from kpops.config import KpopsConfig - yield KpopsConfig._instance = None + yield KUBECONFIG = """ diff --git a/tests/manifests/strimzi/__init__.py b/tests/manifests/strimzi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/manifests/strimzi/test_kafka_topic.py b/tests/manifests/strimzi/test_kafka_topic.py new file mode 100644 index 000000000..a519c2e94 --- /dev/null +++ b/tests/manifests/strimzi/test_kafka_topic.py @@ -0,0 +1,77 @@ +from unittest.mock import MagicMock + +import pytest +from pydantic import ValidationError as PydanticValidationError +from pytest_mock import MockerFixture + +from kpops.api.exception import ValidationError +from kpops.components.common.topic import KafkaTopic, TopicConfig +from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic, TopicSpec + + +@pytest.fixture +def kafka_topic() -> KafkaTopic: + return KafkaTopic( + name="test-topic", + config=TopicConfig.model_validate( + { + "partitions_count": 3, + "replication_factor": 2, + "configs": {"cleanup.policy": "compact"}, + }, + ), + ) + + +def test_topic_spec_defaults(): + spec = TopicSpec() + assert spec.partitions == 1 + assert spec.replicas == 1 + assert spec.config is None + + +def test_topic_spec_custom_values(): + spec = TopicSpec(partitions=3, replicas=2, config={"retention.ms": "60000"}) + assert spec.partitions == 3 + assert spec.replicas == 2 + assert spec.config == {"retention.ms": "60000"} + + +def test_topic_spec_validation(): + with pytest.raises(PydanticValidationError): + TopicSpec(partitions=0) # Less than 1, should raise validation error + + with pytest.raises(PydanticValidationError): + TopicSpec(replicas=40000) # Exceeds max value, should raise validation error + + +def test_strimzi_kafka_topic_from_topic(kafka_topic: KafkaTopic, mocker: MockerFixture): + mock_config = MagicMock() + mock_config.strimzi_topic.cluster_labels = ("bakdata.com/cluster", "my-cluster") + mocker.patch( + "kpops.manifests.strimzi.kafka_topic.get_config", return_value=mock_config + ) + + strimzi_topic = StrimziKafkaTopic.from_topic(kafka_topic) + + # Check metadata + assert strimzi_topic.metadata.name == kafka_topic.name + assert strimzi_topic.metadata.labels == {"bakdata.com/cluster": "my-cluster"} + + # Check spec + assert strimzi_topic.spec.partitions == kafka_topic.config.partitions_count + assert strimzi_topic.spec.replicas == kafka_topic.config.replication_factor + assert strimzi_topic.spec.config == kafka_topic.config.configs + + +def test_strimzi_kafka_topic_missing_config(kafka_topic, mocker): + mock_config = MagicMock() + mock_config.strimzi_topic = None + mocker.patch( + "kpops.manifests.strimzi.kafka_topic.get_config", return_value=mock_config + ) + + with pytest.raises( + ValidationError, match="must define 'strimzi_topic.resource_label'" + ): + StrimziKafkaTopic.from_topic(kafka_topic) diff --git a/tests/pipeline/snapshots/test_manifest/test_deploy_argo_mode/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_deploy_argo_mode/manifest.yaml index e64bce0b3..336d3d726 100644 --- a/tests/pipeline/snapshots/test_manifest/test_deploy_argo_mode/manifest.yaml +++ b/tests/pipeline/snapshots/test_manifest/test_deploy_argo_mode/manifest.yaml @@ -43,6 +43,30 @@ spec: memory: 300Mi restartPolicy: OnFailure +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-producer-app-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-producer-app-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + --- apiVersion: batch/v1 kind: Job @@ -156,6 +180,55 @@ spec: memory: 300Mi terminationGracePeriodSeconds: 300 +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-manifest-pipeline-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + --- apiVersion: batch/v1 kind: Job diff --git a/tests/pipeline/snapshots/test_manifest/test_deploy_manifest_mode/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_deploy_manifest_mode/manifest.yaml index f7086850d..7b6d5b8aa 100644 --- a/tests/pipeline/snapshots/test_manifest/test_deploy_manifest_mode/manifest.yaml +++ b/tests/pipeline/snapshots/test_manifest/test_deploy_manifest_mode/manifest.yaml @@ -41,6 +41,30 @@ spec: memory: 300Mi restartPolicy: OnFailure +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-producer-app-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-producer-app-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + --- apiVersion: apps/v1 kind: Deployment @@ -105,3 +129,52 @@ spec: memory: 300Mi terminationGracePeriodSeconds: 300 +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-manifest-pipeline-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + diff --git a/tests/pipeline/snapshots/test_manifest/test_python_api/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_python_api/manifest.yaml index f7086850d..7b6d5b8aa 100644 --- a/tests/pipeline/snapshots/test_manifest/test_python_api/manifest.yaml +++ b/tests/pipeline/snapshots/test_manifest/test_python_api/manifest.yaml @@ -41,6 +41,30 @@ spec: memory: 300Mi restartPolicy: OnFailure +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-producer-app-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-producer-app-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + --- apiVersion: apps/v1 kind: Deployment @@ -105,3 +129,52 @@ spec: memory: 300Mi terminationGracePeriodSeconds: 300 +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-manifest-pipeline-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + diff --git a/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml index 075777bf9..a1dd22945 100644 --- a/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml +++ b/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml @@ -70,6 +70,30 @@ spec: successfulJobsHistoryLimit: 1 suspend: false +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-producer-app-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-producer-app-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + --- apiVersion: v1 data: @@ -209,3 +233,52 @@ spec: name: resources-streams-bootstrap-my-streams-app name: config +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-streams-bootstrap-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + diff --git a/tests/test_kpops_config.py b/tests/test_kpops_config.py index 0f35ddf37..e86096b21 100644 --- a/tests/test_kpops_config.py +++ b/tests/test_kpops_config.py @@ -1,14 +1,17 @@ import re from pathlib import Path +import pydantic import pytest -from pydantic import AnyHttpUrl, AnyUrl, TypeAdapter, ValidationError +from pydantic import AnyHttpUrl, AnyUrl, TypeAdapter +from kpops.api.exception import ValidationError from kpops.config import ( KafkaConnectConfig, KafkaRestConfig, KpopsConfig, SchemaRegistryConfig, + StrimziTopicConfig, get_config, set_config, ) @@ -43,7 +46,7 @@ def test_kpops_config_with_default_values(): def test_kpops_config_with_different_invalid_urls(): - with pytest.raises(ValidationError): + with pytest.raises(pydantic.ValidationError): KpopsConfig( kafka_brokers="http://broker:9092", kafka_connect=KafkaConnectConfig( @@ -51,7 +54,7 @@ def test_kpops_config_with_different_invalid_urls(): ), ) - with pytest.raises(ValidationError): + with pytest.raises(pydantic.ValidationError): KpopsConfig( kafka_brokers="http://broker:9092", kafka_rest=KafkaRestConfig( @@ -59,7 +62,7 @@ def test_kpops_config_with_different_invalid_urls(): ), ) - with pytest.raises(ValidationError): + with pytest.raises(pydantic.ValidationError): KpopsConfig( kafka_brokers="http://broker:9092", schema_registry=SchemaRegistryConfig( @@ -69,6 +72,7 @@ def test_kpops_config_with_different_invalid_urls(): ) +@pytest.mark.usefixtures("clear_kpops_config") def test_global_kpops_config_not_initialized_error(): with pytest.raises( RuntimeError, @@ -90,3 +94,16 @@ def test_set_global_kpops_config(): ) set_config(config) assert get_config() == config + + +def test_strimzi_topic_config_valid(): + config = StrimziTopicConfig.model_validate({"label": {"key": "value"}}) + assert config.cluster_labels == ("key", "value") + + +def test_strimzi_topic_config_empty_label(): + with pytest.raises( + ValidationError, + match="'strimzi_topic.label' must contain a single key-value pair.", + ): + StrimziTopicConfig.model_validate({"label": {}})