Skip to content

Commit

Permalink
Manifest toSection with Strimzi KafkaTopic (#545)
Browse files Browse the repository at this point in the history
Closes #537
  • Loading branch information
raminqaf authored Dec 10, 2024
1 parent 884565e commit 2071c1b
Show file tree
Hide file tree
Showing 17 changed files with 567 additions and 5 deletions.
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
pipeline_base_dir: tests/pipeline
strimzi_topic:
label:
strimzi.io/cluster: my-cluster
3 changes: 3 additions & 0 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ KPOPS_RETAIN_CLEAN_JOBS=False
# operation_mode
# The operation mode of KPOps (managed, manifest, argo).
KPOPS_OPERATION_MODE=managed
# strimzi_topic
# Configuration for Strimzi Kafka Topics.
KPOPS_STRIMZI_TOPIC # No default value, not required
1 change: 1 addition & 0 deletions docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ These variables take precedence over the settings in `config.yaml`. Variables ma
|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore |
|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion.|retain_clean_jobs |
|KPOPS_OPERATION_MODE |managed |False |The operation mode of KPOps (managed, manifest, argo). |operation_mode |
|KPOPS_STRIMZI_TOPIC | |False |Configuration for Strimzi Kafka Topics. |strimzi_topic |
31 changes: 31 additions & 0 deletions docs/docs/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@
"title": "SchemaRegistryConfig",
"type": "object"
},
"StrimziTopicConfig": {
"additionalProperties": false,
"description": "Configuration for Strimzi Kafka Topics.",
"properties": {
"label": {
"additionalProperties": {
"type": "string"
},
"description": "The label to identify the KafkaTopic resources managed by the Topic Operator. This does not have to be the name of the Kafka cluster. It can be the label assigned to the KafkaTopic resource. If you deploy more than one Topic Operator, the labels must be unique for each. That is, the operators cannot manage the same resources.",
"title": "Label",
"type": "object"
}
},
"required": [
"label"
],
"title": "StrimziTopicConfig",
"type": "object"
},
"TopicNameConfig": {
"additionalProperties": false,
"description": "Configure the topic name variables you can use in the pipeline definition.",
Expand Down Expand Up @@ -283,6 +302,18 @@
},
"description": "Configuration for Schema Registry."
},
"strimzi_topic": {
"anyOf": [
{
"$ref": "#/$defs/StrimziTopicConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "Configuration for Strimzi Kafka Topics."
},
"topic_name_config": {
"allOf": [
{
Expand Down
13 changes: 13 additions & 0 deletions kpops/components/streams_bootstrap/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

import pydantic
from pydantic import Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig
from kpops.components.base_components import KafkaApp
from kpops.components.base_components.helm_app import HelmApp
from kpops.components.streams_bootstrap.model import StreamsBootstrapValues
from kpops.manifests.kubernetes import KubernetesManifest
from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic
from kpops.utils.docstring import describe_attr

if TYPE_CHECKING:
Expand Down Expand Up @@ -83,3 +86,13 @@ def warning_for_latest_image_tag(self) -> Self:
f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag."
)
return self

@override
def manifest_deploy(self) -> tuple[KubernetesManifest, ...]:
resource = super().manifest_deploy()
if self.to:
resource = resource + tuple(
StrimziKafkaTopic.from_topic(topic) for topic in self.to.kafka_topics
)

return resource
35 changes: 35 additions & 0 deletions kpops/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import ClassVar

import pydantic
from pydantic import AnyHttpUrl, Field, PrivateAttr, TypeAdapter
from pydantic_settings import (
BaseSettings,
Expand All @@ -12,13 +13,43 @@
)
from typing_extensions import override

from kpops.api.exception import ValidationError
from kpops.api.operation import OperationMode
from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.docstring import describe_object
from kpops.utils.pydantic import YamlConfigSettingsSource

ENV_PREFIX = "KPOPS_"

log = logging.getLogger("KPOpsConfig")


class StrimziTopicConfig(BaseSettings):
"""Configuration for Strimzi Kafka Topics."""

label_: dict[str, str] = Field(
alias="label",
description="The label to identify the KafkaTopic resources managed by the Topic Operator. This does not have to be the name of the Kafka cluster. It can be the label assigned to the KafkaTopic resource. If you deploy more than one Topic Operator, the labels must be unique for each. That is, the operators cannot manage the same resources.",
)

@property
def cluster_labels(self) -> tuple[str, str]:
"""Return the defined strimzi_topic.label as a tuple."""
return next(iter(self.label_.items()))

@pydantic.field_validator("label_", mode="after")
@classmethod
def label_validator(cls, label: dict[str, str]) -> dict[str, str]:
if len(label) == 0:
msg = "'strimzi_topic.label' must contain a single key-value pair."
raise ValidationError(msg)
if len(label) > 1:
log.warning(
"'resource_label' only reads the first entry in the dictionary. Other defined labels will be ignored."
)

return label


class TopicNameConfig(BaseSettings):
"""Configure the topic name variables you can use in the pipeline definition."""
Expand Down Expand Up @@ -125,6 +156,10 @@ class KpopsConfig(BaseSettings):
default=OperationMode.MANAGED,
description="The operation mode of KPOps (managed, manifest, argo).",
)
strimzi_topic: StrimziTopicConfig | None = Field(
default=None,
description=describe_object(StrimziTopicConfig.__doc__),
)

model_config = SettingsConfigDict(
env_prefix=ENV_PREFIX,
Expand Down
Empty file.
89 changes: 89 additions & 0 deletions kpops/manifests/strimzi/kafka_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from pydantic import ConfigDict, Field, model_validator

from kpops.api.exception import ValidationError
from kpops.components.common.topic import KafkaTopic
from kpops.config import get_config
from kpops.manifests.kubernetes import KubernetesManifest, ObjectMeta
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import CamelCaseConfigModel

if TYPE_CHECKING:
try:
from typing import Self # pyright: ignore[reportAttributeAccessIssue]
except ImportError:
from typing_extensions import Self


class TopicSpec(CamelCaseConfigModel):
"""Specification of a Kafka topic.
:param partitions: The number of partitions the topic should have. This cannot be decreased after topic creation. It can be increased after topic creation, but it is important to understand the consequences that has, especially for topics with semantic partitioning. When absent this will default to the broker configuration for `num.partitions`.
:param replicas: The number of replicas the topic should have. When absent this will default to the broker configuration for `default.replication.factor`.
:param config: The topic configuration. Topic config reference: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html
"""

partitions: int = Field(
default=1, ge=1, description=describe_attr("partitions", __doc__)
)
replicas: int = Field(
default=1, ge=1, le=32767, description=describe_attr("replicas", __doc__)
)
config: dict[str, Any] | None = Field(
default=None, description=describe_attr("config", __doc__)
)

model_config = ConfigDict(extra="allow")

@model_validator(mode="before")
@classmethod
def set_defaults_if_none(cls, values: Any) -> Any:
if values.get("partitions") is None:
values["partitions"] = 1
if values.get("replicas") is None:
values["replicas"] = 1
return values


class StrimziKafkaTopic(KubernetesManifest):
"""Represents a Strimzi Kafka Topic CRD.
CRD definition: https://github.com/strimzi/strimzi-kafka-operator/blob/main/install/cluster-operator/043-Crd-kafkatopic.yaml
example: https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/topic/kafka-topic.yaml
"""

api_version: str = "kafka.strimzi.io/v1beta2"
kind: str = "KafkaTopic"
metadata: ObjectMeta
spec: TopicSpec
status: dict[str, Any] | None = None

@classmethod
def from_topic(cls, topic: KafkaTopic) -> Self:
strimzi_topic = get_config().strimzi_topic
if not strimzi_topic:
msg = "When manifesting KafkaTopic you must define 'strimzi_topic.resource_label' in the config.yaml"
raise ValidationError(msg)
cluster_domain, cluster_name = strimzi_topic.cluster_labels

metadata = ObjectMeta.model_validate(
{
"name": topic.name,
"labels": {cluster_domain: cluster_name},
}
)
spec = TopicSpec.model_validate(
{
"partitions": topic.config.partitions_count,
"replicas": topic.config.replication_factor,
"config": topic.config.configs,
}
)
return cls(
metadata=metadata,
spec=spec,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ schema_registry:
enabled: false
timeout: 30
url: http://localhost:8081/
strimzi_topic: null
topic_name_config:
default_error_topic_name: ${pipeline.name}-${component.name}-error
default_output_topic_name: ${pipeline.name}-${component.name}
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def custom_components() -> Iterator[None]:
def clear_kpops_config() -> Iterator[None]:
from kpops.config import KpopsConfig

yield
KpopsConfig._instance = None
yield


KUBECONFIG = """
Expand Down
Empty file.
77 changes: 77 additions & 0 deletions tests/manifests/strimzi/test_kafka_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from unittest.mock import MagicMock

import pytest
from pydantic import ValidationError as PydanticValidationError
from pytest_mock import MockerFixture

from kpops.api.exception import ValidationError
from kpops.components.common.topic import KafkaTopic, TopicConfig
from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic, TopicSpec


@pytest.fixture
def kafka_topic() -> KafkaTopic:
return KafkaTopic(
name="test-topic",
config=TopicConfig.model_validate(
{
"partitions_count": 3,
"replication_factor": 2,
"configs": {"cleanup.policy": "compact"},
},
),
)


def test_topic_spec_defaults():
spec = TopicSpec()
assert spec.partitions == 1
assert spec.replicas == 1
assert spec.config is None


def test_topic_spec_custom_values():
spec = TopicSpec(partitions=3, replicas=2, config={"retention.ms": "60000"})
assert spec.partitions == 3
assert spec.replicas == 2
assert spec.config == {"retention.ms": "60000"}


def test_topic_spec_validation():
with pytest.raises(PydanticValidationError):
TopicSpec(partitions=0) # Less than 1, should raise validation error

with pytest.raises(PydanticValidationError):
TopicSpec(replicas=40000) # Exceeds max value, should raise validation error


def test_strimzi_kafka_topic_from_topic(kafka_topic: KafkaTopic, mocker: MockerFixture):
mock_config = MagicMock()
mock_config.strimzi_topic.cluster_labels = ("bakdata.com/cluster", "my-cluster")
mocker.patch(
"kpops.manifests.strimzi.kafka_topic.get_config", return_value=mock_config
)

strimzi_topic = StrimziKafkaTopic.from_topic(kafka_topic)

# Check metadata
assert strimzi_topic.metadata.name == kafka_topic.name
assert strimzi_topic.metadata.labels == {"bakdata.com/cluster": "my-cluster"}

# Check spec
assert strimzi_topic.spec.partitions == kafka_topic.config.partitions_count
assert strimzi_topic.spec.replicas == kafka_topic.config.replication_factor
assert strimzi_topic.spec.config == kafka_topic.config.configs


def test_strimzi_kafka_topic_missing_config(kafka_topic, mocker):
mock_config = MagicMock()
mock_config.strimzi_topic = None
mocker.patch(
"kpops.manifests.strimzi.kafka_topic.get_config", return_value=mock_config
)

with pytest.raises(
ValidationError, match="must define 'strimzi_topic.resource_label'"
):
StrimziKafkaTopic.from_topic(kafka_topic)
Loading

0 comments on commit 2071c1b

Please sign in to comment.