Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manifest toSection with Strimzi KafkaTopic #545

Merged
merged 55 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
c218e02
Introduce KPOps operation and manifest resources for deployment
raminqaf Nov 28, 2024
4a29cbd
Introduce KPOps operation and manifest resources for deployment
raminqaf Nov 28, 2024
7ccd04f
Propose refactoring for #541
disrupted Nov 28, 2024
834cd14
Replace `manifest` method with `manifest_deploy`
disrupted Nov 28, 2024
6b3a57a
Merge branch 'main' of github.com:bakdata/kpops into feat/add-operations
raminqaf Dec 2, 2024
be817c3
Merge branch 'feat/kubernetes-manifests' of github.com:bakdata/kpops …
raminqaf Dec 2, 2024
3ce5c95
Introduce KPOps operation and manifest resources for deployment
raminqaf Dec 2, 2024
56f72a1
Introduce KPOps operation and manifest resources for deployment
raminqaf Dec 2, 2024
48de4e2
Merge branch 'main' of github.com:bakdata/kpops into feat/add-operations
raminqaf Dec 2, 2024
af9b23b
Update files
raminqaf Dec 2, 2024
4d1cc1e
Update files
raminqaf Dec 2, 2024
c717002
Update files
raminqaf Dec 2, 2024
975b10e
fix init command
raminqaf Dec 2, 2024
7b47919
clean ups
raminqaf Dec 2, 2024
9fcd55f
clean ups
raminqaf Dec 2, 2024
aaed48d
clean ups
raminqaf Dec 2, 2024
ece6e9f
clean ups
raminqaf Dec 2, 2024
3cc771e
Update files
raminqaf Dec 2, 2024
a7e3c00
Update files
raminqaf Dec 2, 2024
0ec82e2
Update files
raminqaf Dec 2, 2024
af25f92
address reviews
raminqaf Dec 3, 2024
9e1938e
add tests
raminqaf Dec 3, 2024
9a4dc35
Update files
raminqaf Dec 4, 2024
64cc384
refactor argo sync wave
raminqaf Dec 4, 2024
15fe717
Manifest toSection with Strimzi Kafka topic
raminqaf Dec 4, 2024
0c62a53
Address reviews
raminqaf Dec 4, 2024
ab5b74e
Address reviews
raminqaf Dec 4, 2024
8838bfa
Define pydantic model to representig Kubernetes manifest
raminqaf Dec 4, 2024
f1ed671
Define pydantic model to representig Kubernetes manifest
raminqaf Dec 4, 2024
a24de23
Update files
raminqaf Dec 4, 2024
32ea4dc
Update files
raminqaf Dec 4, 2024
2a4838a
Merge remote-tracking branch 'origin/v9' into feat/add-strimzi-topic
raminqaf Dec 4, 2024
79e1af4
Update files
raminqaf Dec 4, 2024
b2b2983
Update files
raminqaf Dec 4, 2024
d894e57
Update files
raminqaf Dec 4, 2024
e67908e
Update files
raminqaf Dec 4, 2024
0a2e7c8
Update files
raminqaf Dec 4, 2024
4ebd633
Update files
raminqaf Dec 5, 2024
ea3237b
Update files
raminqaf Dec 5, 2024
1070fde
clean ups
raminqaf Dec 5, 2024
50e81df
Update files
raminqaf Dec 5, 2024
4b187bb
clean ups
raminqaf Dec 6, 2024
c25a883
add cluster name label
raminqaf Dec 6, 2024
8ea444c
Update files
raminqaf Dec 6, 2024
16d88c7
Update files
raminqaf Dec 9, 2024
3f71255
Update files
raminqaf Dec 9, 2024
2f1b510
Update files
raminqaf Dec 9, 2024
baae8e3
fix tests
raminqaf Dec 9, 2024
ac780e4
Update files
raminqaf Dec 9, 2024
253157f
Update files
raminqaf Dec 9, 2024
b32b6a4
Update files
raminqaf Dec 9, 2024
62a6b7c
Merge branch 'v9' of github.com:bakdata/kpops into feat/add-strimzi-t…
raminqaf Dec 10, 2024
794d1a6
change strimzi label
raminqaf Dec 10, 2024
c41af8d
add property
raminqaf Dec 10, 2024
7f1adb7
add tests
raminqaf Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
pipeline_base_dir: tests/pipeline
strimzi_topic:
resource_label: "strimzi.io/cluster=my-cluster"
3 changes: 3 additions & 0 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ KPOPS_RETAIN_CLEAN_JOBS=False
# operation_mode
# The operation mode of KPOps (managed, manifest, argo).
KPOPS_OPERATION_MODE=managed
# strimzi_topic
# Configuration for Strimzi Kafka Topics.
KPOPS_STRIMZI_TOPIC # No default value, not required
1 change: 1 addition & 0 deletions docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ These variables take precedence over the settings in `config.yaml`. Variables ma
|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore |
|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion.|retain_clean_jobs |
|KPOPS_OPERATION_MODE |managed |False |The operation mode of KPOps (managed, manifest, argo). |operation_mode |
|KPOPS_STRIMZI_TOPIC | |False |Configuration for Strimzi Kafka Topics. |strimzi_topic |
38 changes: 38 additions & 0 deletions docs/docs/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,32 @@
"title": "SchemaRegistryConfig",
"type": "object"
},
"StrimziTopicConfig": {
"additionalProperties": false,
"description": "Configuration for Strimzi Kafka Topics.",
"properties": {
"resource_label": {
"description": "The label to identify the KafkaTopic resources managed by the Topic Operator. This does not have to be the name of the Kafka cluster. It can be the label assigned to the KafkaTopic resource. If you deploy more than one Topic Operator, the labels must be unique for each. That is, the operators cannot manage the same resources.",
"maxItems": 2,
"minItems": 2,
"prefixItems": [
{
"type": "string"
},
{
"type": "string"
}
],
"title": "Resource Label",
"type": "array"
}
},
"required": [
"resource_label"
],
"title": "StrimziTopicConfig",
"type": "object"
},
"TopicNameConfig": {
"additionalProperties": false,
"description": "Configure the topic name variables you can use in the pipeline definition.",
Expand Down Expand Up @@ -283,6 +309,18 @@
},
"description": "Configuration for Schema Registry."
},
"strimzi_topic": {
"anyOf": [
{
"$ref": "#/$defs/StrimziTopicConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "Configuration for Strimzi Kafka Topics."
},
"topic_name_config": {
"allOf": [
{
Expand Down
13 changes: 13 additions & 0 deletions kpops/components/streams_bootstrap/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

import pydantic
from pydantic import Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig
from kpops.components.base_components import KafkaApp
from kpops.components.base_components.helm_app import HelmApp
from kpops.components.streams_bootstrap.model import StreamsBootstrapValues
from kpops.manifests.kubernetes import KubernetesManifest
from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic
from kpops.utils.docstring import describe_attr

if TYPE_CHECKING:
Expand Down Expand Up @@ -81,3 +84,13 @@ def warning_for_latest_image_tag(self) -> Self:
f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag."
)
return self

@override
def manifest_deploy(self) -> tuple[KubernetesManifest, ...]:
resource = super().manifest_deploy()
if self.to:
resource = resource + tuple(
StrimziKafkaTopic.from_topic(topic) for topic in self.to.kafka_topics
)

return resource
30 changes: 29 additions & 1 deletion kpops/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import logging
from pathlib import Path
from typing import ClassVar
from typing import Any, ClassVar

import pydantic
from pydantic import AnyHttpUrl, Field, PrivateAttr, TypeAdapter
from pydantic_settings import (
BaseSettings,
Expand All @@ -12,6 +13,7 @@
)
from typing_extensions import override

from kpops.api.exception import ValidationError
from kpops.api.operation import OperationMode
from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.docstring import describe_object
Expand All @@ -20,6 +22,28 @@
ENV_PREFIX = "KPOPS_"


class StrimziTopicConfig(BaseSettings):
"""Configuration for Strimzi Kafka Topics."""

resource_label: tuple[str, str] = Field(
description="The label to identify the KafkaTopic resources managed by the Topic Operator. This does not have to be the name of the Kafka cluster. It can be the label assigned to the KafkaTopic resource. If you deploy more than one Topic Operator, the labels must be unique for each. That is, the operators cannot manage the same resources."
)

@pydantic.field_validator("resource_label", mode="before")
@classmethod
def deserialize_topics(cls, resource_label: Any) -> tuple[str, str]:
match resource_label:
case str(value) if "=" in value:
key, value = value.split("=")
return key, value
case dict(value) if len(value) == 1:
key, value = next(iter(value.items()))
return key, value
case _:
msg = "'resource_label' should be defined either like 'foo=bar' or as a valid dictionary."
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
raise ValidationError(msg)


class TopicNameConfig(BaseSettings):
"""Configure the topic name variables you can use in the pipeline definition."""

Expand Down Expand Up @@ -125,6 +149,10 @@ class KpopsConfig(BaseSettings):
default=OperationMode.MANAGED,
description="The operation mode of KPOps (managed, manifest, argo).",
)
strimzi_topic: StrimziTopicConfig | None = Field(
default=None,
description=describe_object(StrimziTopicConfig.__doc__),
)

model_config = SettingsConfigDict(
env_prefix=ENV_PREFIX,
Expand Down
Empty file.
89 changes: 89 additions & 0 deletions kpops/manifests/strimzi/kafka_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from pydantic import ConfigDict, Field, model_validator

from kpops.api.exception import ValidationError
from kpops.components.common.topic import KafkaTopic
from kpops.config import get_config
from kpops.manifests.kubernetes import KubernetesManifest, ObjectMeta
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import CamelCaseConfigModel

if TYPE_CHECKING:
try:
from typing import Self # pyright: ignore[reportAttributeAccessIssue]
except ImportError:
from typing_extensions import Self


class TopicSpec(CamelCaseConfigModel):
"""Specification of a Kafka topic.

:param partitions: The number of partitions the topic should have. This cannot be decreased after topic creation. It can be increased after topic creation, but it is important to understand the consequences that has, especially for topics with semantic partitioning. When absent this will default to the broker configuration for `num.partitions`.
:param replicas: The number of replicas the topic should have. When absent this will default to the broker configuration for `default.replication.factor`.
:param config: The topic configuration. Topic config reference: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html

"""

partitions: int = Field(
default=1, ge=1, description=describe_attr("partitions", __doc__)
)
replicas: int = Field(
default=1, ge=1, le=32767, description=describe_attr("replicas", __doc__)
)
config: dict[str, Any] | None = Field(
default=None, description=describe_attr("config", __doc__)
)

model_config = ConfigDict(extra="allow")

@model_validator(mode="before")
@classmethod
def set_defaults_if_none(cls, values: Any) -> Any:
if values.get("partitions") is None:
values["partitions"] = 1
if values.get("replicas") is None:
values["replicas"] = 1
return values


class StrimziKafkaTopic(KubernetesManifest):
"""Represents a Strimzi Kafka Topic CRD.

CRD definition: https://github.com/strimzi/strimzi-kafka-operator/blob/main/install/cluster-operator/043-Crd-kafkatopic.yaml
example: https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/topic/kafka-topic.yaml
"""

api_version: str = "kafka.strimzi.io/v1beta2"
kind: str = "KafkaTopic"
metadata: ObjectMeta
spec: TopicSpec
status: dict[str, Any] | None = None

@classmethod
def from_topic(cls, topic: KafkaTopic) -> Self:
strimzi_topic = get_config().strimzi_topic
if not strimzi_topic:
msg = "When manifesting KafkaTopic you must define 'strimzi_topic.resource_label' in the config.yaml"
raise ValidationError(msg)
topic_resource_label = strimzi_topic.resource_label

metadata = ObjectMeta.model_validate(
{
"name": topic.name,
"labels": {topic_resource_label[0]: topic_resource_label[1]},
}
)
spec = TopicSpec.model_validate(
{
"partitions": topic.config.partitions_count,
"replicas": topic.config.replication_factor,
"config": topic.config.configs,
}
)
return cls(
metadata=metadata,
spec=spec,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ schema_registry:
enabled: false
timeout: 30
url: http://localhost:8081/
strimzi_topic: null
topic_name_config:
default_error_topic_name: ${pipeline.name}-${component.name}-error
default_output_topic_name: ${pipeline.name}-${component.name}
Empty file.
77 changes: 77 additions & 0 deletions tests/manifests/strimzi/test_kafka_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from unittest.mock import MagicMock

import pytest
from pydantic import ValidationError as PydanticValidationError
from pytest_mock import MockerFixture

from kpops.api.exception import ValidationError
from kpops.components.common.topic import KafkaTopic, TopicConfig
from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic, TopicSpec


@pytest.fixture
def kafka_topic() -> KafkaTopic:
return KafkaTopic(
name="test-topic",
config=TopicConfig.model_validate(
{
"partitions_count": 3,
"replication_factor": 2,
"configs": {"cleanup.policy": "compact"},
},
),
)


def test_topic_spec_defaults():
spec = TopicSpec()
assert spec.partitions == 1
assert spec.replicas == 1
assert spec.config is None


def test_topic_spec_custom_values():
spec = TopicSpec(partitions=3, replicas=2, config={"retention.ms": "60000"})
assert spec.partitions == 3
assert spec.replicas == 2
assert spec.config == {"retention.ms": "60000"}


def test_topic_spec_validation():
with pytest.raises(PydanticValidationError):
TopicSpec(partitions=0) # Less than 1, should raise validation error

with pytest.raises(PydanticValidationError):
TopicSpec(replicas=40000) # Exceeds max value, should raise validation error


def test_strimzi_kafka_topic_from_topic(kafka_topic: KafkaTopic, mocker: MockerFixture):
mock_config = MagicMock()
mock_config.strimzi_topic.resource_label = ("bakdata.com/cluster", "my-cluster")
mocker.patch(
"kpops.manifests.strimzi.kafka_topic.get_config", return_value=mock_config
)

strimzi_topic = StrimziKafkaTopic.from_topic(kafka_topic)

# Check metadata
assert strimzi_topic.metadata.name == kafka_topic.name
assert strimzi_topic.metadata.labels == {"bakdata.com/cluster": "my-cluster"}

# Check spec
assert strimzi_topic.spec.partitions == kafka_topic.config.partitions_count
assert strimzi_topic.spec.replicas == kafka_topic.config.replication_factor
assert strimzi_topic.spec.config == kafka_topic.config.configs


def test_strimzi_kafka_topic_missing_config(kafka_topic, mocker):
mock_config = MagicMock()
mock_config.strimzi_topic = None
mocker.patch(
"kpops.manifests.strimzi.kafka_topic.get_config", return_value=mock_config
)

with pytest.raises(
ValidationError, match="must define 'strimzi_topic.resource_label'"
):
StrimziKafkaTopic.from_topic(kafka_topic)
Loading