Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove repeated defaults from streams-bootstrap values #547

Merged
merged 21 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
571 changes: 392 additions & 179 deletions docs/docs/schema/defaults.json

Large diffs are not rendered by default.

508 changes: 351 additions & 157 deletions docs/docs/schema/pipeline.json

Large diffs are not rendered by default.

34 changes: 21 additions & 13 deletions kpops/components/common/kubernetes_model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import enum
from typing import Annotated

import pydantic
from pydantic import Field

from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import DescConfigModel

# Matches plain integer or numbers with valid suffixes: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
MEMORY_PATTERN = r"^\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$"


class ServiceType(str, enum.Enum):
"""Represents the different Kubernetes service types.
Expand Down Expand Up @@ -72,13 +71,11 @@ class Toleration(DescConfigModel):
:param toleration_seconds: The duration for which the toleration is valid.
"""

key: str = Field(default=..., description=describe_attr("key", __doc__))
key: str = Field(description=describe_attr("key", __doc__))

operator: Operation = Field(
default=Operation.EQUAL, description=describe_attr("operator", __doc__)
)
operator: Operation = Field(description=describe_attr("operator", __doc__))

effect: Effects = Field(default=..., description=describe_attr("effect", __doc__))
effect: Effects = Field(description=describe_attr("effect", __doc__))

value: str | None = Field(default=None, description=describe_attr("value", __doc__))

Expand All @@ -87,16 +84,27 @@ class Toleration(DescConfigModel):
)


CPUStr = Annotated[str, pydantic.StringConstraints(pattern=r"^\d+m$")]
MemoryStr = Annotated[
# Matches plain number string or number with valid suffixes: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
str, pydantic.StringConstraints(pattern=r"^\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$")
]


class ResourceDefinition(DescConfigModel):
"""Model representing the 'limits' or `request` section of Kubernetes resource specifications.
"""Model representing the `limits` or `requests` section of Kubernetes resource specifications.

:param cpu: The maximum amount of CPU a container can use, expressed in milli CPUs (e.g., '300m').
:param memory: The maximum amount of memory a container can use, with valid units such as 'Mi' or 'Gi' (e.g., '2G').
:param memory: The maximum amount of memory a container can use, as integer or string with valid units such as 'Mi' or 'Gi' (e.g., '2G').
"""

cpu: str | int = Field(pattern=r"^\d+m$", description=describe_attr("cpu", __doc__))
memory: str = Field(
pattern=MEMORY_PATTERN, description=describe_attr("memory", __doc__)
cpu: CPUStr | pydantic.PositiveInt | None = Field(
default=None,
description=describe_attr("cpu", __doc__),
)
memory: MemoryStr | pydantic.PositiveInt | None = Field(
default=None,
description=describe_attr("memory", __doc__),
)


Expand Down
4 changes: 3 additions & 1 deletion kpops/components/streams_bootstrap/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ def version_validator(cls, version: str) -> str:

@pydantic.model_validator(mode="after")
def warning_for_latest_image_tag(self) -> Self:
if self.validate_ and self.values.image_tag == "latest":
if self.validate_ and (
not self.values.image_tag or self.values.image_tag == "latest"
):
log.warning(
f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag."
)
Expand Down
36 changes: 16 additions & 20 deletions kpops/components/streams_bootstrap/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from kpops.components.common.kubernetes_model import (
ImagePullPolicy,
ProtocolSchema,
ResourceDefinition,
Resources,
ServiceType,
Toleration,
Expand Down Expand Up @@ -69,8 +68,8 @@ class ServiceConfig(CamelCaseConfigModel, DescConfigModel):
default_factory=dict,
description=describe_attr("labels", __doc__),
)
type: ServiceType = Field(
default=ServiceType.CLUSTER_IP,
type: ServiceType | None = Field(
default=None,
description=describe_attr("type", __doc__),
)

Expand All @@ -82,8 +81,8 @@ class JavaOptions(CamelCaseConfigModel, DescConfigModel):
:param others: List of Java VM options passed to the streams app.
"""

max_RAM_percentage: int = Field(
default=75,
max_RAM_percentage: int | None = Field(
default=None,
description=describe_attr("max_RAM_percentage", __doc__),
)
others: list[str] = Field(
Expand Down Expand Up @@ -120,14 +119,14 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("image", __doc__),
)

image_tag: str = Field(
default="latest",
image_tag: str | None = Field(
default=None,
pattern=IMAGE_TAG_PATTERN,
description=describe_attr("image_tag", __doc__),
)

image_pull_policy: ImagePullPolicy = Field(
default=ImagePullPolicy.ALWAYS,
image_pull_policy: ImagePullPolicy | None = Field(
default=None,
description=describe_attr("image_pull_policy", __doc__),
)

Expand All @@ -140,11 +139,8 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("kafka", __doc__),
)

resources: Resources = Field(
default=Resources(
requests=ResourceDefinition(cpu="100m", memory="500Mi"),
limits=ResourceDefinition(cpu="300m", memory="2G"),
),
resources: Resources | None = Field(
default=None,
description=describe_attr("resources", __doc__),
)

Expand All @@ -153,13 +149,13 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("ports", __doc__),
)

service: ServiceConfig = Field(
default_factory=ServiceConfig,
service: ServiceConfig | None = Field(
default=None,
description=describe_attr("service", __doc__),
)

configuration_env_prefix: str = Field(
default="APP",
configuration_env_prefix: str | None = Field(
default=None,
description=describe_attr("configuration_env_prefix", __doc__),
)

Expand Down Expand Up @@ -193,8 +189,8 @@ class StreamsBootstrapValues(HelmAppValues):
description=describe_attr("files", __doc__),
)

java_options: JavaOptions = Field(
default_factory=JavaOptions,
java_options: JavaOptions | None = Field(
default=None,
description=describe_attr("java_options", __doc__),
)

Expand Down
31 changes: 17 additions & 14 deletions kpops/components/streams_bootstrap/producer/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,45 @@ class ProducerAppValues(StreamsBootstrapValues):

kafka: ProducerConfig = Field(description=describe_attr("kafka", __doc__))

deployment: bool = Field(
default=False, description=describe_attr("deployment", __doc__)
deployment: bool | None = Field(
default=None, description=describe_attr("deployment", __doc__)
)

restart_policy: RestartPolicy = Field(
default=RestartPolicy.ON_FAILURE,
restart_policy: RestartPolicy | None = Field(
default=None,
description=describe_attr("restart_policy", __doc__),
)

schedule: str | None = Field(
default=None, description=describe_attr("schedule", __doc__)
)

suspend: bool = Field(default=False, description=describe_attr("suspend", __doc__))
suspend: bool | None = Field(
default=None, description=describe_attr("suspend", __doc__)
)

successful_jobs_history_limit: int = Field(
default=1, description=describe_attr("successful_jobs_history_limit", __doc__)
successful_jobs_history_limit: int | None = Field(
default=None,
description=describe_attr("successful_jobs_history_limit", __doc__),
)

failed_jobs_history_limit: int = Field(
default=1, description=describe_attr("failed_jobs_history_limit", __doc__)
failed_jobs_history_limit: int | None = Field(
default=None, description=describe_attr("failed_jobs_history_limit", __doc__)
)

backoff_limit: int = Field(
default=6, description=describe_attr("backoff_limit", __doc__)
backoff_limit: int | None = Field(
default=None, description=describe_attr("backoff_limit", __doc__)
)

ttl_seconds_after_finished: int = Field(
default=100, description=describe_attr("ttl_seconds_after_finished", __doc__)
ttl_seconds_after_finished: int | None = Field(
default=None, description=describe_attr("ttl_seconds_after_finished", __doc__)
)

model_config = ConfigDict(extra="allow")

@field_validator("schedule")
@classmethod
def schedule_cron_validator(cls, schedule: str) -> str:
def schedule_cron_validator(cls, schedule: str | None) -> str | None:
"""Ensure that the defined schedule value is valid."""
if schedule and not croniter.is_valid(schedule):
msg = f"The schedule field '{schedule}' must be a valid cron expression."
Expand Down
52 changes: 24 additions & 28 deletions kpops/components/streams_bootstrap/streams/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from kpops.components.common.kubernetes_model import (
ImagePullPolicy,
ResourceDefinition,
Resources,
)
from kpops.components.common.topic import KafkaTopic, KafkaTopicStr
Expand Down Expand Up @@ -217,7 +216,7 @@ class PersistenceConfig(BaseModel):

enabled: bool = Field(
default=False,
description="Whether to use a persistent volume to store the state of the streams app. ",
description="Whether to use a persistent volume to store the state of the streams app.",
)
size: str | None = Field(
default=None,
Expand Down Expand Up @@ -247,36 +246,33 @@ class PrometheusJMXExporterConfig(CamelCaseConfigModel, DescConfigModel):
:param resources: JMX Exporter resources configuration.
"""

enabled: bool = Field(
default=True,
enabled: bool | None = Field(
default=None,
description=describe_attr("enabled", __doc__),
)
image: str = Field(
default="solsson/kafka-prometheus-jmx-exporter@sha256",
image: str | None = Field(
default=None,
description=describe_attr("image", __doc__),
)
image_tag: str = Field(
default="6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143",
image_tag: str | None = Field(
default=None,
description=describe_attr("image_tag", __doc__),
)
image_pull_policy: ImagePullPolicy = Field(
default=ImagePullPolicy.IF_NOT_PRESENT,
image_pull_policy: ImagePullPolicy | None = Field(
default=None,
description=describe_attr("image_pull_policy", __doc__),
)
port: int = Field(
default=5556,
port: int | None = Field(
default=None,
description=describe_attr("port", __doc__),
)
resources: Resources = Field(
default=Resources(
requests=ResourceDefinition(cpu="100m", memory="500Mi"),
limits=ResourceDefinition(cpu="300m", memory="2G"),
),
resources: Resources | None = Field(
default=None,
description=describe_attr("resources", __doc__),
)

jmx: PrometheusJMXExporterConfig = Field(
default_factory=PrometheusJMXExporterConfig,
jmx: PrometheusJMXExporterConfig | None = Field(
default=None,
description=describe_attr("jmx", __doc__),
)

Expand All @@ -288,13 +284,13 @@ class JMXConfig(CamelCaseConfigModel, DescConfigModel):
:param metric_rules: List of JMX metric rules.
"""

port: int = Field(
default=5555,
port: int | None = Field(
default=None,
description=describe_attr("port", __doc__),
)

metric_rules: list[str] = Field(
default=[".*"],
default_factory=list,
description=describe_attr("metric_rules", __doc__),
)

Expand Down Expand Up @@ -332,18 +328,18 @@ class StreamsAppValues(StreamsBootstrapValues):
description=describe_attr("persistence", __doc__),
)

prometheus: PrometheusExporterConfig = Field(
default_factory=PrometheusExporterConfig,
prometheus: PrometheusExporterConfig | None = Field(
default=None,
description=describe_attr("prometheus", __doc__),
)

jmx: JMXConfig = Field(
default_factory=JMXConfig,
jmx: JMXConfig | None = Field(
default=None,
description=describe_attr("jmx", __doc__),
)

termination_grace_period_seconds: int = Field(
default=300,
termination_grace_period_seconds: int | None = Field(
default=None,
description=describe_attr("termination_grace_period_seconds", __doc__),
)

Expand Down
Loading