Skip to content

Commit

Permalink
Introduce KPOps operation and manifest resources for deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Nov 28, 2024
1 parent 4ba3b81 commit c218e02
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 15 deletions.
2 changes: 2 additions & 0 deletions docs/docs/resources/variables/cli_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ KPOPS_DOTENV_PATH # No default value, not required
# Suffix your environment files with this value (e.g.
# defaults_development.yaml for environment=development).
KPOPS_ENVIRONMENT # No default value, not required
# How KPOps should operate.
KPOPS_OPERATION_MODE=standard
# Paths to dir containing 'pipeline.yaml' or files named
# 'pipeline.yaml'.
KPOPS_PIPELINE_PATHS # No default value, required
Expand Down
1 change: 1 addition & 0 deletions docs/docs/resources/variables/cli_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ These variables take precedence over the commands' flags. If a variable is set,
|KPOPS_CONFIG_PATH |. |False |Path to the dir containing config.yaml files |
|KPOPS_DOTENV_PATH | |False |Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. |
|KPOPS_ENVIRONMENT | |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|
|KPOPS_OPERATION_MODE|standard |False |How KPOps should operate. |
|KPOPS_PIPELINE_PATHS| |True |Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. |
|KPOPS_PIPELINE_STEPS| |False |Comma separated list of steps to apply the command on |
3 changes: 3 additions & 0 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ KPOPS_HELM_DIFF_CONFIG__IGNORE # No default value, required
# Whether to retain clean up jobs in the cluster or uninstall the,
# after completion.
KPOPS_RETAIN_CLEAN_JOBS=False
# operation_mode
# The operation mode of KPOps (standard, manifest, argo).
KPOPS_OPERATION_MODE=standard
1 change: 1 addition & 0 deletions docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ These variables take precedence over the settings in `config.yaml`. Variables ma
|KPOPS_HELM_CONFIG__API_VERSION | |False |Kubernetes API version used for `Capabilities.APIVersions` |helm_config.api_version |
|KPOPS_HELM_DIFF_CONFIG__IGNORE | |True |Set of keys that should not be checked. |helm_diff_config.ignore |
|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion.|retain_clean_jobs |
|KPOPS_OPERATION_MODE |standard |False |The operation mode of KPOps (standard, manifest, argo). |operation_mode |
18 changes: 18 additions & 0 deletions docs/docs/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@
"title": "KafkaRestConfig",
"type": "object"
},
"OperationMode": {
"enum": [
"argo",
"manifest",
"standard"
],
"title": "OperationMode",
"type": "string"
},
"SchemaRegistryConfig": {
"additionalProperties": false,
"description": "Configuration for Schema Registry.",
Expand Down Expand Up @@ -239,6 +248,15 @@
},
"description": "Configuration for Kafka REST Proxy."
},
"operation_mode": {
"allOf": [
{
"$ref": "#/$defs/OperationMode"
}
],
"default": "standard",
"description": "The operation mode of KPOps (standard, manifest, argo)."
},
"pipeline_base_dir": {
"default": ".",
"description": "Base directory to the pipelines (default is current working directory)",
Expand Down
1 change: 1 addition & 0 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ $ kpops deploy [OPTIONS] PIPELINE_PATHS...
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--operation-mode [argo|manifest|standard]`: How KPOps should operate. [env var: KPOPS_OPERATION_MODE; default: standard]
* `--help`: Show this message and exit.

## `kpops destroy`
Expand Down
28 changes: 28 additions & 0 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import asyncio
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING

from kpops.api.logs import log, log_action
from kpops.api.operation import OperationMode
from kpops.api.options import FilterType
from kpops.api.registry import Registry
from kpops.component_handlers import ComponentHandlers
Expand Down Expand Up @@ -105,6 +107,32 @@ def manifest(
return resources


def manifest_deployment(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
verbose: bool = True,
operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[Resource]:
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
# TODO: KPOps config is created twice. Once in generate and once here. change it!
KpopsConfig.create(config, dotenv, environment, verbose, operation_mode)
for component in pipeline.components:
resource = component.manifest_deploy()
yield resource


def deploy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
Expand Down
9 changes: 9 additions & 0 deletions kpops/api/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import annotations

from enum import Enum


class OperationMode(str, Enum):
ARGO = "argo"
MANIFEST = "manifest"
STANDARD = "standard"
49 changes: 37 additions & 12 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typer

import kpops.api as kpops
from kpops.api.operation import OperationMode
from kpops.api.options import FilterType
from kpops.cli.utils import (
collect_pipeline_paths,
Expand Down Expand Up @@ -110,6 +111,12 @@
"Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). "
),
)
OPERATION_MODE_OPTION: OperationMode = typer.Option(
default=OperationMode.STANDARD,
envvar=f"{ENV_PREFIX}OPERATION_MODE",
# TODO: better help?
help="How KPOps should operate.",
)


def parse_steps(steps: str | None) -> set[str] | None:
Expand Down Expand Up @@ -219,19 +226,37 @@ def deploy(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
operation_mode: OperationMode = OPERATION_MODE_OPTION,
):
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.deploy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
match operation_mode:
case OperationMode.STANDARD:
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.deploy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
case _:
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
resources = kpops.manifest_deployment(
pipeline_file_path,
dotenv,
config,
parse_steps(steps),
filter_type,
environment,
verbose,
operation_mode,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)


@app.command(help="Destroy pipeline steps")
Expand Down
6 changes: 6 additions & 0 deletions kpops/components/base_components/helm_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from kpops.components.base_components.models.resource import Resource
from kpops.config import get_config
from kpops.manifestors.helm_app_manifestor import HelmAppManifestor
from kpops.utils.colorify import magentaify
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import exclude_by_name
Expand Down Expand Up @@ -151,6 +152,11 @@ def manifest(self) -> Resource:
self.template_flags,
)

@override
def manifest_deploy(self) -> Resource:
helm_app_manifestor = HelmAppManifestor()
return helm_app_manifestor.generate_manifest(self)

@property
def deploy_flags(self) -> HelmUpgradeInstallFlags:
"""Return flags for Helm upgrade install command."""
Expand Down
4 changes: 2 additions & 2 deletions kpops/components/base_components/models/resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Mapping, Sequence
from collections.abc import Mapping
from typing import Any, TypeAlias

# representation of final resource for component, e.g. a list of Kubernetes manifests
Resource: TypeAlias = Sequence[Mapping[str, Any]]
Resource: TypeAlias = list[Mapping[str, Any]]
16 changes: 16 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,22 @@ def manifest(self) -> Resource:
"""Render final component resources, e.g. Kubernetes manifests."""
return []

def manifest_deploy(self) -> Resource:
"""Render final component resources for deployment, e.g. Kubernetes manifests."""
return []

def manifest_destroy(self) -> Resource:
"""Render final component resources for destroy, e.g. Kubernetes manifests."""
return []

def manifest_reset(self) -> Resource:
"""Render final component resources for reset, e.g. Kubernetes manifests."""
return []

def manifest_clean(self) -> Resource:
"""Render final component resources for clean, e.g. Kubernetes manifests."""
return []

async def deploy(self, dry_run: bool) -> None:
"""Deploy component, e.g. to Kubernetes cluster.
Expand Down
20 changes: 20 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from pydantic import Field, ValidationError, computed_field
from typing_extensions import override

from kpops.api.operation import OperationMode
from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler
from kpops.components.base_components.helm_app import HelmApp
from kpops.components.base_components.kafka_app import KafkaAppCleaner
from kpops.components.base_components.models.resource import Resource
from kpops.components.common.app_type import AppType
from kpops.components.common.topic import KafkaTopic
from kpops.components.streams_bootstrap.base import (
Expand All @@ -15,7 +17,9 @@
from kpops.components.streams_bootstrap.streams.model import (
StreamsAppValues,
)
from kpops.config import get_config
from kpops.const.file_type import DEFAULTS_YAML, PIPELINE_YAML
from kpops.manifestors.streams_app_manifestor import StreamsAppCleanerManifestor
from kpops.utils.docstring import describe_attr

log = logging.getLogger("StreamsApp")
Expand Down Expand Up @@ -152,3 +156,19 @@ async def clean(self, dry_run: bool) -> None:
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)

@override
def manifest_deploy(self) -> Resource:
manifest = super().manifest_deploy()
operation_mode = get_config().operation_mode

if operation_mode is OperationMode.ARGO:
clean_manifestor = StreamsAppCleanerManifestor()
clean_manifest = clean_manifestor.generate_manifest(self._cleaner)
manifest.extend(clean_manifest)

return manifest

@override
def manifest_clean(self) -> Resource:
return []
8 changes: 8 additions & 0 deletions kpops/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from typing_extensions import override

from kpops.api.operation import OperationMode
from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.docstring import describe_object
from kpops.utils.pydantic import YamlConfigSettingsSource
Expand Down Expand Up @@ -120,6 +121,10 @@ class KpopsConfig(BaseSettings):
default=False,
description="Whether to retain clean up jobs in the cluster or uninstall the, after completion.",
)
operation_mode: OperationMode = Field(
default=OperationMode.STANDARD,
description="The operation mode of KPOps (standard, manifest, argo).",
)

model_config = SettingsConfigDict(env_prefix=ENV_PREFIX, env_nested_delimiter="__")

Expand All @@ -130,13 +135,16 @@ def create(
dotenv: list[Path] | None = None,
environment: str | None = None,
verbose: bool = False,
operation_mode: OperationMode | None = None,
) -> KpopsConfig:
cls.setup_logging_level(verbose)
YamlConfigSettingsSource.config_dir = config_dir
YamlConfigSettingsSource.environment = environment
cls._instance = KpopsConfig(
_env_file=dotenv # pyright: ignore[reportCallIssue]
)
if operation_mode:
cls._instance.operation_mode = operation_mode
return cls._instance

@staticmethod
Expand Down
Empty file added kpops/manifestors/__init__.py
Empty file.
37 changes: 37 additions & 0 deletions kpops/manifestors/helm_app_manifestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from kpops.api.operation import OperationMode
from kpops.manifestors.manifestor import Manifestor

if TYPE_CHECKING:
from kpops.components.base_components import HelmApp
from kpops.components.base_components.models.resource import Resource


class HelmAppManifestor(Manifestor):
"""Manifestor for the HelmApp component."""

def generate_annotations(self) -> dict[str, str]:
"""Generate annotations for HelmApp based on operation mode."""
match self.operation_mode:
case OperationMode.ARGO:
return {"argocd.argoproj.io/sync-wave": "1"}
case _:
return {}

def generate_manifest(self, helm_app: HelmApp) -> Resource:
"""Generate the Helm manifest for HelmApp."""
values = helm_app.to_helm_values()
annotations = self.generate_annotations()
if annotations:
values["annotations"] = annotations

return helm_app.helm.template(
helm_app.helm_release_name,
helm_app.helm_chart,
helm_app.namespace,
values,
helm_app.template_flags,
)
27 changes: 27 additions & 0 deletions kpops/manifestors/manifestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

from kpops.config import get_config

if TYPE_CHECKING:
from kpops.components.base_components import HelmApp
from kpops.components.base_components.models.resource import Resource


class Manifestor(ABC):
"""Base class for generating manifests for different components."""

def __init__(self):
self.operation_mode = get_config().operation_mode

@abstractmethod
def generate_annotations(self) -> dict[str, str]:
"""Generate the annotations for the component based on the operation mode."""
...

@abstractmethod
def generate_manifest(self, helm_app: HelmApp) -> Resource:
"""Generate the Helm manifest for the component."""
...
Loading

0 comments on commit c218e02

Please sign in to comment.