Skip to content

Commit

Permalink
Merge branch 'v7' of github.com:bakdata/kpops into feature/clean-with…
Browse files Browse the repository at this point in the history
…-old-image-tag

# Conflicts:
#	kpops/components/streams_bootstrap/streams/streams_app.py
  • Loading branch information
raminqaf committed Jul 15, 2024
2 parents 0e264e7 + bcef0e5 commit b031585
Show file tree
Hide file tree
Showing 60 changed files with 716 additions and 707 deletions.
1 change: 0 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
components_module: tests.pipeline.test_components
pipeline_base_dir: tests/pipeline
9 changes: 2 additions & 7 deletions docs/docs/resources/pipeline-config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# CONFIGURATION
#
# Custom Python module defining project-specific KPOps components
components_module: null
# Base directory to the pipelines (default is current working directory)
pipeline_base_dir: .
# The Kafka brokers address.
# REQUIRED
kafka_brokers: "http://broker1:9092,http://broker2:9092"
# Configure the topic name variables you can use in the pipeline definition.
topic_name_config:
topic_name_config:
# Configures the value for the variable ${output_topic_name}
default_output_topic_name: ${pipeline.name}-${component.name}
# Configures the value for the variable ${error_topic_name}
Expand All @@ -27,9 +25,6 @@ kafka_rest:
kafka_connect:
# Address of Kafka Connect.
url: "http://localhost:8083"
# The timeout in seconds that specifies when actions like deletion or deploy
# timeout.
timeout: 300
# Flag for `helm upgrade --install`.
# Create the release namespace if not present.
create_namespace: false
Expand All @@ -42,7 +37,7 @@ helm_config:
# Kubernetes API version used for Capabilities.APIVersions
api_version: null
# Configure Helm Diff.
helm_diff_config:
helm_diff_config:
# Set of keys that should not be checked.
ignore:
- name
Expand Down
3 changes: 0 additions & 3 deletions docs/docs/resources/variables/config_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
# settings in `config.yaml`. Variables marked as required can instead
# be set in the global config.
#
# components_module
# Custom Python module defining project-specific KPOps components
KPOPS_COMPONENTS_MODULE # No default value, not required
# pipeline_base_dir
# Base directory to the pipelines (default is current working
# directory)
Expand Down
1 change: 0 additions & 1 deletion docs/docs/resources/variables/config_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ These variables take precedence over the settings in `config.yaml`. Variables ma

| Name | Default Value |Required| Description | Setting name |
|--------------------------------------------------|----------------------------------------|--------|----------------------------------------------------------------------------------|-------------------------------------------|
|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module |
|KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir |
|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers |
|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name|
Expand Down
13 changes: 0 additions & 13 deletions docs/docs/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,6 @@
"additionalProperties": false,
"description": "Global configuration for KPOps project.",
"properties": {
"components_module": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Custom Python module defining project-specific KPOps components",
"title": "Components Module"
},
"create_namespace": {
"default": false,
"description": "Flag for `helm upgrade --install`. Create the release namespace if not present.",
Expand Down
99 changes: 99 additions & 0 deletions docs/docs/user/migration-guide/v6-v7.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Migrate from V6 to V7

## [Automatic loading of namespaced custom components](https://github.com/bakdata/kpops/pull/500)

KPOps is now distributed as a Python namespace package (as defined by [PEP 420](https://peps.python.org/pep-0420/)). This allows us to standardize the namespace `kpops.components` for both builtin and custom pipeline components.

As a result of the restructure, some imports need to be adjusted:

**KPOps Python API**

```diff
-import kpops
+import kpops.api as kpops
```

**builtin KPOps components**

```diff
-from kpops.components import (
- HelmApp,
- KafkaApp,
- KafkaConnector,
- KafkaSinkConnector,
- KafkaSourceConnector,
- KubernetesApp,
- StreamsBootstrap,
- ProducerApp,
- StreamsApp,
- PipelineComponent,
- StreamsApp,
- ProducerApp,
-)
+from kpops.components.base_components import (
+ HelmApp,
+ KafkaApp,
+ KafkaConnector,
+ KafkaSinkConnector,
+ KafkaSourceConnector,
+ KubernetesApp,
+ PipelineComponent,
+)
+from kpops.components.streams_bootstrap import (
+ StreamsBootstrap,
+ StreamsApp,
+ ProducerApp,
+)
```

### your custom KPOps components

#### config.yaml

```diff
-components_module: components
```

#### Python module

```diff
-components/__init__.py
+kpops/components/custom/__init__.py
```

## [Call destroy from inside of reset or clean](https://github.com/bakdata/kpops/pull/501)

Before v7, the KPOps CLI executed `destroy` before running `reset/clean` to ensure the component was destroyed.

This logic has changed. The `destroy` method is now called within the `PipelineComponent`'s `reset`/`clean`.

During migrating to v7, you should check your custom components and see if they override the `reset`/`clean` methods. If so, you need to call the supermethod `reset`/`clean` to trigger the `destroy` inside the parent class. Alternatively, if you are implementing the `PipelineComponent` class, you need to call the `destroy` method at the beginning of the method.

#### components.py

For example, when creating a custom `StreamsApp` or `ProducerApp` (or any other custom component), you **must** call the supermethod `reset`/`clean` to execute the `destroy` in the parent class. **Otherwise, the logic of destroy will not be executed!**

````diff
class MyStreamsApp(StreamsApp):

@override
async def clean(self, dry_run: bool) -> None:
+ await super().clean(dry_run)
# Some custom clean logic
# ...
```diff


class MyCustomComponent(PipelineComponent):

@override
async def destroy(self, dry_run: bool) -> None:
# Some custom destroy logic
# ...

@override
async def clean(self, dry_run: bool) -> None:
+ await super().clean(dry_run)
# Some custom clean logic
# ...
````
9 changes: 5 additions & 4 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,16 @@ $ kpops schema [OPTIONS] SCOPE:{pipeline|defaults|config}



pipeline: Schema of PipelineComponents. Includes the built-in KPOps components by default. To include custom components, provide components module in config.
- pipeline: Schema of PipelineComponents for KPOps pipeline.yaml


- defaults: Schema of PipelineComponents for KPOps defaults.yaml


config: Schema of KpopsConfig. [required]
- config: Schema for KPOps config.yaml [required]

**Options**:

* `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .]
* `--include-stock-components / --no-include-stock-components`: Include the built-in KPOps components. [default: include-stock-components]
* `--help`: Show this message and exit.
1 change: 1 addition & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ nav:
- Migrate from v3 to v4: user/migration-guide/v3-v4.md
- Migrate from v4 to v5: user/migration-guide/v4-v5.md
- Migrate from v5 to v6: user/migration-guide/v5-v6.md
- Migrate from v6 to v7: user/migration-guide/v6-v7.md
- CLI usage: user/references/cli-commands.md
- Editor integration: user/references/editor-integration.md
- CI integration:
Expand Down
12 changes: 9 additions & 3 deletions hooks/gen_docs/gen_docs_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
import yaml

from hooks import ROOT
from kpops.api.registry import _find_classes
from kpops.components import KafkaConnector, PipelineComponent
from kpops.api.registry import Registry
from kpops.components.base_components.kafka_connector import KafkaConnector
from kpops.components.base_components.pipeline_component import (
PipelineComponent,
)
from kpops.utils.colorify import redify, yellowify
from kpops.utils.pydantic import issubclass_patched
from kpops.utils.yaml import load_yaml_file

registry = Registry()
registry.discover_components()

PATH_KPOPS_MAIN = ROOT / "kpops/cli/main.py"
PATH_CLI_COMMANDS_DOC = ROOT / "docs/docs/user/references/cli-commands.md"
PATH_DOCS_RESOURCES = ROOT / "docs/docs/resources"
Expand All @@ -33,7 +39,7 @@
(PATH_DOCS_RESOURCES / "pipeline-defaults/headers").iterdir(),
)

KPOPS_COMPONENTS = tuple(_find_classes("kpops.components", PipelineComponent))
KPOPS_COMPONENTS = tuple(registry.components)
KPOPS_COMPONENTS_SECTIONS = {
component.type: [
field_name
Expand Down
14 changes: 0 additions & 14 deletions kpops/__init__.py

This file was deleted.

19 changes: 7 additions & 12 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pathlib import Path
from typing import TYPE_CHECKING

import kpops
from kpops.api.logs import log, log_action
from kpops.api.options import FilterType
from kpops.api.registry import Registry
Expand All @@ -23,8 +22,8 @@
from kpops.utils.cli_commands import init_project

if TYPE_CHECKING:
from kpops.components import PipelineComponent
from kpops.components.base_components.models.resource import Resource
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.config import KpopsConfig


Expand Down Expand Up @@ -90,7 +89,7 @@ def manifest(
:param verbose: Enable verbose printing.
:return: Resources.
"""
pipeline = kpops.generate(
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
Expand Down Expand Up @@ -129,7 +128,7 @@ def deploy(
:param verbose: Enable verbose printing.
:param parallel: Enable or disable parallel execution of pipeline steps.
"""
pipeline = kpops.generate(
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
Expand Down Expand Up @@ -177,7 +176,7 @@ def destroy(
:param verbose: Enable verbose printing.
:param parallel: Enable or disable parallel execution of pipeline steps.
"""
pipeline = kpops.generate(
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
Expand Down Expand Up @@ -227,7 +226,7 @@ def reset(
:param verbose: Enable verbose printing.
:param parallel: Enable or disable parallel execution of pipeline steps.
"""
pipeline = kpops.generate(
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
Expand All @@ -238,7 +237,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -276,7 +274,7 @@ def clean(
:param verbose: Enable verbose printing.
:param parallel: Enable or disable parallel execution of pipeline steps.
"""
pipeline = kpops.generate(
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
Expand All @@ -287,7 +285,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down Expand Up @@ -333,9 +330,7 @@ def _create_pipeline(
:return: Created `Pipeline` object.
"""
registry = Registry()
if kpops_config.components_module:
registry.find_components(kpops_config.components_module)
registry.find_components("kpops.components")
registry.discover_components()

handlers = _setup_handlers(kpops_config)
parser = PipelineGenerator(kpops_config, registry, handlers)
Expand Down
5 changes: 5 additions & 0 deletions kpops/api/file_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ def as_yaml_file(self, prefix: str = "", suffix: str = "") -> str:
'pre_pipeline_suf.yaml'
"""
return prefix + self.value + suffix + FILE_EXTENSION


PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file()
DEFAULTS_YAML = KpopsFileType.DEFAULTS.as_yaml_file()
CONFIG_YAML = KpopsFileType.CONFIG.as_yaml_file()
2 changes: 1 addition & 1 deletion kpops/api/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import typer

if TYPE_CHECKING:
from kpops.components import PipelineComponent
from kpops.components.base_components.pipeline_component import PipelineComponent


class CustomFormatter(logging.Formatter):
Expand Down
2 changes: 1 addition & 1 deletion kpops/api/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from kpops.components import PipelineComponent
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.pipeline import ComponentFilterPredicate


Expand Down
Loading

0 comments on commit b031585

Please sign in to comment.