From 7a8849ba92c5d53cb0e6e7de2be58a0a719feedc Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 8 Jul 2024 19:45:35 +0200 Subject: [PATCH 1/2] Automatic loading of namespaced custom components (#500) --- config.yaml | 1 - .../resources/pipeline-config/config.yaml | 9 +- .../resources/variables/config_env_vars.env | 3 - .../resources/variables/config_env_vars.md | 1 - docs/docs/schema/config.json | 13 --- docs/docs/user/migration-guide/v6-v7.md | 62 +++++++++++ docs/docs/user/references/cli-commands.md | 9 +- docs/mkdocs.yml | 1 + hooks/gen_docs/gen_docs_components.py | 12 ++- kpops/__init__.py | 14 --- kpops/api/__init__.py | 17 ++- kpops/api/file_type.py | 5 + kpops/api/logs.py | 2 +- kpops/api/options.py | 2 +- kpops/api/registry.py | 75 ++++++++----- kpops/cli/main.py | 38 +++---- kpops/cli/utils.py | 4 +- .../schema_handler/schema_handler.py | 12 +-- kpops/components/__init__.py | 27 ----- kpops/components/base_components/__init__.py | 19 ++++ kpops/components/base_components/kafka_app.py | 2 +- .../components/common}/__init__.py | 0 kpops/components/common/streams_bootstrap.py | 74 +++++++++++++ .../components/streams_bootstrap/__init__.py | 81 ++------------ .../streams_bootstrap/producer/model.py | 2 +- .../producer/producer_app.py | 2 +- .../streams_bootstrap/streams/model.py | 2 +- .../streams_bootstrap/streams/streams_app.py | 9 +- kpops/{config.py => config/__init__.py} | 4 - kpops/const/__init__.py | 3 + kpops/{pipeline.py => pipeline/__init__.py} | 0 kpops/utils/gen_schema.py | 99 ++++------------- pyproject.toml | 2 +- tests/api/test_handlers.py | 5 +- tests/api/test_registry.py | 102 ++++++++++++------ tests/cli/resources/config.yaml | 1 - tests/cli/resources/empty_module/config.yaml | 2 - tests/cli/resources/no_module/config.yaml | 1 - .../test_init_project/config_include_opt.yaml | 1 - tests/cli/test_init.py | 2 +- tests/cli/test_schema_generation.py | 100 +---------------- .../schema_handler/test_schema_handler.py | 62 ++--------- .../test_base_defaults_component.py | 6 +- tests/components/test_kafka_sink_connector.py | 6 +- tests/components/test_producer_app.py | 6 +- tests/components/test_streams_app.py | 2 +- tests/components/test_streams_bootstrap.py | 5 +- tests/conftest.py | 13 +++ tests/pipeline/test_components/components.py | 10 +- .../components.py | 10 +- tests/pipeline/test_example.py | 2 +- tests/pipeline/test_generate.py | 11 +- tests/pipeline/test_manifest.py | 2 +- tests/pipeline/test_pipeline.py | 2 +- 54 files changed, 428 insertions(+), 529 deletions(-) create mode 100644 docs/docs/user/migration-guide/v6-v7.md delete mode 100644 kpops/__init__.py delete mode 100644 kpops/components/__init__.py rename {tests/cli/resources/empty_module => kpops/components/common}/__init__.py (100%) create mode 100644 kpops/components/common/streams_bootstrap.py rename kpops/{config.py => config/__init__.py} (97%) create mode 100644 kpops/const/__init__.py rename kpops/{pipeline.py => pipeline/__init__.py} (100%) delete mode 100644 tests/cli/resources/empty_module/config.yaml delete mode 100644 tests/cli/resources/no_module/config.yaml diff --git a/config.yaml b/config.yaml index 7d0e97a54..359b51a21 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,2 @@ kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" -components_module: tests.pipeline.test_components pipeline_base_dir: tests/pipeline diff --git a/docs/docs/resources/pipeline-config/config.yaml b/docs/docs/resources/pipeline-config/config.yaml index 862a49ac0..d8c5433b7 100644 --- a/docs/docs/resources/pipeline-config/config.yaml +++ b/docs/docs/resources/pipeline-config/config.yaml @@ -1,14 +1,12 @@ # CONFIGURATION # -# Custom Python module defining project-specific KPOps components -components_module: null # Base directory to the pipelines (default is current working directory) pipeline_base_dir: . # The Kafka brokers address. # REQUIRED kafka_brokers: "http://broker1:9092,http://broker2:9092" # Configure the topic name variables you can use in the pipeline definition. -topic_name_config: +topic_name_config: # Configures the value for the variable ${output_topic_name} default_output_topic_name: ${pipeline.name}-${component.name} # Configures the value for the variable ${error_topic_name} @@ -27,9 +25,6 @@ kafka_rest: kafka_connect: # Address of Kafka Connect. url: "http://localhost:8083" -# The timeout in seconds that specifies when actions like deletion or deploy -# timeout. -timeout: 300 # Flag for `helm upgrade --install`. # Create the release namespace if not present. create_namespace: false @@ -42,7 +37,7 @@ helm_config: # Kubernetes API version used for Capabilities.APIVersions api_version: null # Configure Helm Diff. -helm_diff_config: +helm_diff_config: # Set of keys that should not be checked. ignore: - name diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index c4b4050e8..f558d4d19 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -4,9 +4,6 @@ # settings in `config.yaml`. Variables marked as required can instead # be set in the global config. # -# components_module -# Custom Python module defining project-specific KPOps components -KPOPS_COMPONENTS_MODULE # No default value, not required # pipeline_base_dir # Base directory to the pipelines (default is current working # directory) diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 171715ba5..8685acba0 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -2,7 +2,6 @@ These variables take precedence over the settings in `config.yaml`. Variables ma | Name | Default Value |Required| Description | Setting name | |--------------------------------------------------|----------------------------------------|--------|----------------------------------------------------------------------------------|-------------------------------------------| -|KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module | |KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir | |KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | |KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index 47aab93b1..949c42791 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -177,19 +177,6 @@ "additionalProperties": false, "description": "Global configuration for KPOps project.", "properties": { - "components_module": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Custom Python module defining project-specific KPOps components", - "title": "Components Module" - }, "create_namespace": { "default": false, "description": "Flag for `helm upgrade --install`. Create the release namespace if not present.", diff --git a/docs/docs/user/migration-guide/v6-v7.md b/docs/docs/user/migration-guide/v6-v7.md new file mode 100644 index 000000000..53c83c796 --- /dev/null +++ b/docs/docs/user/migration-guide/v6-v7.md @@ -0,0 +1,62 @@ +# Migrate from V6 to V7 + +## [Automatic loading of namespaced custom components](https://github.com/bakdata/kpops/pull/500) + +KPOps is now distributed as a Python namespace package (as defined by [PEP 420](https://peps.python.org/pep-0420/)). This allows us to standardize the namespace `kpops.components` for both builtin and custom pipeline components. + +As a result of the restructure, some imports need to be adjusted: + +**KPOps Python API** + +```diff +-import kpops ++import kpops.api as kpops +``` + +**builtin KPOps components** + +```diff +-from kpops.components import ( +- HelmApp, +- KafkaApp, +- KafkaConnector, +- KafkaSinkConnector, +- KafkaSourceConnector, +- KubernetesApp, +- StreamsBootstrap, +- ProducerApp, +- StreamsApp, +- PipelineComponent, +- StreamsApp, +- ProducerApp, +-) ++from kpops.components.base_components import ( ++ HelmApp, ++ KafkaApp, ++ KafkaConnector, ++ KafkaSinkConnector, ++ KafkaSourceConnector, ++ KubernetesApp, ++ PipelineComponent, ++) ++from kpops.components.streams_bootstrap import ( ++ StreamsBootstrap, ++ StreamsApp, ++ ProducerApp, ++) +``` + +### your custom KPOps components + +#### config.yaml + +```diff +-components_module: components +``` + +#### Python module + +```diff +-components/__init__.py ++kpops/components/custom/__init__.py +``` diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 570563069..0a187f139 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -215,15 +215,16 @@ $ kpops schema [OPTIONS] SCOPE:{pipeline|defaults|config} - pipeline: Schema of PipelineComponents. Includes the built-in KPOps components by default. To include custom components, provide components module in config. + - pipeline: Schema of PipelineComponents for KPOps pipeline.yaml + - defaults: Schema of PipelineComponents for KPOps defaults.yaml + + - config: Schema of KpopsConfig. [required] + - config: Schema for KPOps config.yaml [required] **Options**: -* `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--include-stock-components / --no-include-stock-components`: Include the built-in KPOps components. [default: include-stock-components] * `--help`: Show this message and exit. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 0cadd93c5..0703f915e 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -130,6 +130,7 @@ nav: - Migrate from v3 to v4: user/migration-guide/v3-v4.md - Migrate from v4 to v5: user/migration-guide/v4-v5.md - Migrate from v5 to v6: user/migration-guide/v5-v6.md + - Migrate from v6 to v7: user/migration-guide/v6-v7.md - CLI usage: user/references/cli-commands.md - Editor integration: user/references/editor-integration.md - CI integration: diff --git a/hooks/gen_docs/gen_docs_components.py b/hooks/gen_docs/gen_docs_components.py index 10bb40af7..e5e03062d 100644 --- a/hooks/gen_docs/gen_docs_components.py +++ b/hooks/gen_docs/gen_docs_components.py @@ -8,12 +8,18 @@ import yaml from hooks import ROOT -from kpops.api.registry import _find_classes -from kpops.components import KafkaConnector, PipelineComponent +from kpops.api.registry import Registry +from kpops.components.base_components.kafka_connector import KafkaConnector +from kpops.components.base_components.pipeline_component import ( + PipelineComponent, +) from kpops.utils.colorify import redify, yellowify from kpops.utils.pydantic import issubclass_patched from kpops.utils.yaml import load_yaml_file +registry = Registry() +registry.discover_components() + PATH_KPOPS_MAIN = ROOT / "kpops/cli/main.py" PATH_CLI_COMMANDS_DOC = ROOT / "docs/docs/user/references/cli-commands.md" PATH_DOCS_RESOURCES = ROOT / "docs/docs/resources" @@ -33,7 +39,7 @@ (PATH_DOCS_RESOURCES / "pipeline-defaults/headers").iterdir(), ) -KPOPS_COMPONENTS = tuple(_find_classes("kpops.components", PipelineComponent)) +KPOPS_COMPONENTS = tuple(registry.components) KPOPS_COMPONENTS_SECTIONS = { component.type: [ field_name diff --git a/kpops/__init__.py b/kpops/__init__.py deleted file mode 100644 index 22aebfa50..000000000 --- a/kpops/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -__version__ = "6.0.2" - -# export public API functions -from kpops.api import clean, deploy, destroy, generate, init, manifest, reset - -__all__ = ( - "generate", - "manifest", - "deploy", - "destroy", - "reset", - "clean", - "init", -) diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 3073d9452..783fb3d57 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -4,7 +4,6 @@ from pathlib import Path from typing import TYPE_CHECKING -import kpops from kpops.api.logs import log, log_action from kpops.api.options import FilterType from kpops.api.registry import Registry @@ -23,8 +22,8 @@ from kpops.utils.cli_commands import init_project if TYPE_CHECKING: - from kpops.components import PipelineComponent from kpops.components.base_components.models.resource import Resource + from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.config import KpopsConfig @@ -90,7 +89,7 @@ def manifest( :param verbose: Enable verbose printing. :return: Resources. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -129,7 +128,7 @@ def deploy( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -177,7 +176,7 @@ def destroy( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -227,7 +226,7 @@ def reset( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -276,7 +275,7 @@ def clean( :param verbose: Enable verbose printing. :param parallel: Enable or disable parallel execution of pipeline steps. """ - pipeline = kpops.generate( + pipeline = generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -333,9 +332,7 @@ def _create_pipeline( :return: Created `Pipeline` object. """ registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") + registry.discover_components() handlers = _setup_handlers(kpops_config) parser = PipelineGenerator(kpops_config, registry, handlers) diff --git a/kpops/api/file_type.py b/kpops/api/file_type.py index c08fce987..3e170be96 100644 --- a/kpops/api/file_type.py +++ b/kpops/api/file_type.py @@ -33,3 +33,8 @@ def as_yaml_file(self, prefix: str = "", suffix: str = "") -> str: 'pre_pipeline_suf.yaml' """ return prefix + self.value + suffix + FILE_EXTENSION + + +PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() +DEFAULTS_YAML = KpopsFileType.DEFAULTS.as_yaml_file() +CONFIG_YAML = KpopsFileType.CONFIG.as_yaml_file() diff --git a/kpops/api/logs.py b/kpops/api/logs.py index e9a833aba..979b5a36c 100644 --- a/kpops/api/logs.py +++ b/kpops/api/logs.py @@ -6,7 +6,7 @@ import typer if TYPE_CHECKING: - from kpops.components import PipelineComponent + from kpops.components.base_components.pipeline_component import PipelineComponent class CustomFormatter(logging.Formatter): diff --git a/kpops/api/options.py b/kpops/api/options.py index dc116bd35..22fda2542 100644 --- a/kpops/api/options.py +++ b/kpops/api/options.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from kpops.components import PipelineComponent + from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.pipeline import ComponentFilterPredicate diff --git a/kpops/api/registry.py b/kpops/api/registry.py index 2df483329..37c8e5f9c 100644 --- a/kpops/api/registry.py +++ b/kpops/api/registry.py @@ -3,22 +3,26 @@ import importlib import inspect import logging +import pkgutil import sys +from collections.abc import Iterable from dataclasses import dataclass, field from pathlib import Path +from types import ModuleType from typing import TYPE_CHECKING, TypeVar -from kpops import __name__ +import typer + from kpops.api.exception import ClassNotFoundError from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.const import KPOPS_MODULE if TYPE_CHECKING: from collections.abc import Iterator -KPOPS_MODULE = __name__ + "." -T = TypeVar("T") -ClassDict = dict[str, type[T]] # type -> class +_PluginT = TypeVar("_PluginT") +ClassDict = dict[str, type[_PluginT]] # type -> class sys.path.append(str(Path.cwd())) log = logging.getLogger("Registry") @@ -30,12 +34,17 @@ class Registry: _classes: ClassDict[PipelineComponent] = field(default_factory=dict, init=False) - def find_components(self, module_name: str) -> None: - """Find all PipelineComponent subclasses in module. + @property + def components(self) -> Iterator[type[PipelineComponent]]: + yield from self._classes.values() + + def discover_components(self) -> None: + """Discover first- and third-party KPOps components. - :param module_name: name of the python module. + That is all classes inheriting from PipelineComponent. """ - for _class in _find_classes(module_name, PipelineComponent): + custom_modules = self.iter_component_modules() + for _class in _find_classes(custom_modules, base=PipelineComponent): self._classes[_class.type] = _class def __getitem__(self, component_type: str) -> type[PipelineComponent]: @@ -45,32 +54,50 @@ def __getitem__(self, component_type: str) -> type[PipelineComponent]: msg = f"Could not find a component of type {component_type}" raise ClassNotFoundError(msg) from ke + @staticmethod + def iter_component_modules() -> Iterator[ModuleType]: + import kpops.components + + yield kpops.components + yield from _iter_namespace(kpops.components) -def find_class(module_name: str, baseclass: type[T]) -> type[T]: + +def find_class(modules: Iterable[ModuleType], base: type[_PluginT]) -> type[_PluginT]: try: - return next(_find_classes(module_name, baseclass)) + return next(_find_classes(modules, base=base)) except StopIteration as e: raise ClassNotFoundError from e -def _find_classes(module_name: str, baseclass: type[T]) -> Iterator[type[T]]: +def import_module(module_name: str) -> ModuleType: module = importlib.import_module(module_name) - if module.__file__ and not module_name.startswith(KPOPS_MODULE): - file_path = Path(module.__file__) - try: - rel_path = file_path.relative_to(Path.cwd()) - log.debug(f"Picked up: {rel_path}") - except ValueError: - log.debug(f"Picked up: {file_path}") - for _, _class in inspect.getmembers(module, inspect.isclass): - if not __filter_internal_kpops_classes( - _class.__module__, module_name - ) and issubclass(_class, baseclass): - yield _class + if module.__file__: + log.debug( + f"Loading {typer.style(module.__name__,bold=True)} ({module.__file__})" + ) + return module + + +def _find_classes( + modules: Iterable[ModuleType], base: type[_PluginT] +) -> Iterator[type[_PluginT]]: + for module in modules: + for _, _class in inspect.getmembers(module, inspect.isclass): + if not __filter_internal_kpops_classes( + _class.__module__, module.__name__ + ) and issubclass(_class, base): + yield _class def __filter_internal_kpops_classes(class_module: str, module_name: str) -> bool: - # filter out internal kpops classes and components unless specifically requested + """Filter out internal kpops classes and components unless specifically requested.""" return class_module.startswith(KPOPS_MODULE) and not module_name.startswith( KPOPS_MODULE ) + + +def _iter_namespace(ns_pkg: ModuleType) -> Iterator[ModuleType]: + for _, module_name, _ in pkgutil.iter_modules( + ns_pkg.__path__, ns_pkg.__name__ + "." + ): + yield import_module(module_name) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 395ab8e53..16d379456 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -5,12 +5,14 @@ import typer -import kpops -from kpops import __version__ -from kpops.api.file_type import KpopsFileType +import kpops.api as kpops +from kpops.api.file_type import CONFIG_YAML, DEFAULTS_YAML, PIPELINE_YAML, KpopsFileType from kpops.api.options import FilterType -from kpops.cli.utils import collect_pipeline_paths -from kpops.config import ENV_PREFIX, KpopsConfig +from kpops.cli.utils import ( + collect_pipeline_paths, +) +from kpops.config import ENV_PREFIX +from kpops.const import KPOPS, __version__ from kpops.utils.gen_schema import ( gen_config_schema, gen_defaults_schema, @@ -129,29 +131,21 @@ def schema( scope: KpopsFileType = typer.Argument( ..., show_default=False, - help=""" + help=f""" Scope of the generated schema \n\n\n - pipeline: Schema of PipelineComponents. Includes the built-in KPOps components by default. To include custom components, provide components module in config. - \n\n\n - config: Schema of KpopsConfig.""", - ), - config: Path = CONFIG_PATH_OPTION, - include_stock_components: bool = typer.Option( - default=True, help="Include the built-in KPOps components." + - {KpopsFileType.PIPELINE.value}: Schema of PipelineComponents for KPOps {PIPELINE_YAML} + \n\n + - {KpopsFileType.DEFAULTS.value}: Schema of PipelineComponents for KPOps {DEFAULTS_YAML} + \n\n + - {KpopsFileType.CONFIG.value}: Schema for KPOps {CONFIG_YAML}""", ), ) -> None: match scope: case KpopsFileType.PIPELINE: - kpops_config = KpopsConfig.create(config) - gen_pipeline_schema( - kpops_config.components_module, include_stock_components - ) + gen_pipeline_schema() case KpopsFileType.DEFAULTS: - kpops_config = KpopsConfig.create(config) - gen_defaults_schema( - kpops_config.components_module, include_stock_components - ) + gen_defaults_schema() case KpopsFileType.CONFIG: gen_config_schema() @@ -316,7 +310,7 @@ def clean( def version_callback(show_version: bool) -> None: if show_version: - typer.echo(f"KPOps {__version__}") + typer.echo(f"{KPOPS} {__version__}") raise typer.Exit diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py index f4a04bcbb..16a8af563 100644 --- a/kpops/cli/utils.py +++ b/kpops/cli/utils.py @@ -3,9 +3,7 @@ from collections.abc import Iterable, Iterator from pathlib import Path -from kpops.api.file_type import KpopsFileType - -PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() +from kpops.api.file_type import PIPELINE_YAML def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]: diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 94b5cd3bf..c23bea4a3 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -9,7 +9,7 @@ from schema_registry.client.schema import AvroSchema from kpops.api.exception import ClassNotFoundError -from kpops.api.registry import find_class +from kpops.api.registry import Registry, find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, @@ -29,18 +29,16 @@ def __init__(self, kpops_config: KpopsConfig) -> None: str(kpops_config.schema_registry.url), timeout=kpops_config.schema_registry.timeout, # pyright: ignore[reportArgumentType] ) - self.components_module = kpops_config.components_module @cached_property def schema_provider(self) -> SchemaProvider: try: - if not self.components_module: - msg = f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." - raise ValueError(msg) - schema_provider_class = find_class(self.components_module, SchemaProvider) + schema_provider_class = find_class( + Registry.iter_component_modules(), base=SchemaProvider + ) return schema_provider_class() # pyright: ignore[reportAbstractUsage] except ClassNotFoundError as e: - msg = f"No schema provider found in components module {self.components_module}. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." + msg = f"No schema provider found. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." raise ValueError(msg) from e @classmethod diff --git a/kpops/components/__init__.py b/kpops/components/__init__.py deleted file mode 100644 index 3800b16d5..000000000 --- a/kpops/components/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -from kpops.components.base_components.helm_app import HelmApp -from kpops.components.base_components.kafka_app import KafkaApp -from kpops.components.base_components.kafka_connector import ( - KafkaConnector, - KafkaSinkConnector, - KafkaSourceConnector, -) -from kpops.components.base_components.kubernetes_app import KubernetesApp -from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.components.streams_bootstrap import StreamsBootstrap -from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp -from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp - -__all__ = ( - "HelmApp", - "KafkaApp", - "KafkaConnector", - "KafkaSinkConnector", - "KafkaSourceConnector", - "KubernetesApp", - "StreamsBootstrap", - "ProducerApp", - "StreamsApp", - "PipelineComponent", - "StreamsApp", - "ProducerApp", -) diff --git a/kpops/components/base_components/__init__.py b/kpops/components/base_components/__init__.py index e69de29bb..ff94dde1f 100644 --- a/kpops/components/base_components/__init__.py +++ b/kpops/components/base_components/__init__.py @@ -0,0 +1,19 @@ +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.base_components.kafka_app import KafkaApp +from kpops.components.base_components.kafka_connector import ( + KafkaConnector, + KafkaSinkConnector, + KafkaSourceConnector, +) +from kpops.components.base_components.kubernetes_app import KubernetesApp +from kpops.components.base_components.pipeline_component import PipelineComponent + +__all__ = ( + "HelmApp", + "KafkaApp", + "KafkaConnector", + "KafkaSinkConnector", + "KafkaSourceConnector", + "KubernetesApp", + "PipelineComponent", +) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index f52ca6e78..0ab4806b9 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -13,7 +13,7 @@ from kpops.components.base_components.helm_app import HelmAppValues from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/tests/cli/resources/empty_module/__init__.py b/kpops/components/common/__init__.py similarity index 100% rename from tests/cli/resources/empty_module/__init__.py rename to kpops/components/common/__init__.py diff --git a/kpops/components/common/streams_bootstrap.py b/kpops/components/common/streams_bootstrap.py new file mode 100644 index 000000000..ba45dbd30 --- /dev/null +++ b/kpops/components/common/streams_bootstrap.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import logging +from abc import ABC +from typing import TYPE_CHECKING + +import pydantic +from pydantic import Field + +from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig +from kpops.components.base_components.helm_app import HelmApp, HelmAppValues +from kpops.utils.docstring import describe_attr + +if TYPE_CHECKING: + try: + from typing import Self # pyright: ignore[reportAttributeAccessIssue] + except ImportError: + from typing_extensions import Self + +STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", +) +STREAMS_BOOTSTRAP_VERSION = "2.9.0" + +# Source of the pattern: https://kubernetes.io/docs/concepts/containers/images/#image-names +IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$" + +log = logging.getLogger("StreamsBootstrap") + + +class StreamsBootstrapValues(HelmAppValues): + """Base value class for all streams bootstrap related components. + + :param image_tag: Docker image tag of the streams-bootstrap app. + """ + + image_tag: str = Field( + default="latest", + pattern=IMAGE_TAG_PATTERN, + description=describe_attr("image_tag", __doc__), + ) + + +class StreamsBootstrap(HelmApp, ABC): + """Base for components with a streams-bootstrap Helm chart. + + :param app: streams-bootstrap app values + :param repo_config: Configuration of the Helm chart repo to be used for + deploying the component, defaults to streams-bootstrap Helm repo + :param version: Helm chart version, defaults to "2.9.0" + """ + + app: StreamsBootstrapValues = Field( + default_factory=StreamsBootstrapValues, + description=describe_attr("app", __doc__), + ) + + repo_config: HelmRepoConfig = Field( + default=STREAMS_BOOTSTRAP_HELM_REPO, + description=describe_attr("repo_config", __doc__), + ) + version: str | None = Field( + default=STREAMS_BOOTSTRAP_VERSION, + description=describe_attr("version", __doc__), + ) + + @pydantic.model_validator(mode="after") + def warning_for_latest_image_tag(self) -> Self: + if self.validate_ and self.app.image_tag == "latest": + log.warning( + f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag." + ) + return self diff --git a/kpops/components/streams_bootstrap/__init__.py b/kpops/components/streams_bootstrap/__init__.py index c6c329b85..b4eb34b2f 100644 --- a/kpops/components/streams_bootstrap/__init__.py +++ b/kpops/components/streams_bootstrap/__init__.py @@ -1,74 +1,9 @@ -from __future__ import annotations - -import logging -from abc import ABC -from typing import TYPE_CHECKING - -import pydantic -from pydantic import Field - -from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig -from kpops.components.base_components.helm_app import HelmApp, HelmAppValues -from kpops.utils.docstring import describe_attr - -if TYPE_CHECKING: - try: - from typing import Self # pyright: ignore[reportAttributeAccessIssue] - except ImportError: - from typing_extensions import Self - -STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", +from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp + +__all__ = ( + "StreamsBootstrap", + "StreamsApp", + "ProducerApp", ) -STREAMS_BOOTSTRAP_VERSION = "2.9.0" - -log = logging.getLogger("StreamsBootstrap") - -# Source of the pattern: https://kubernetes.io/docs/concepts/containers/images/#image-names -IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$" - - -class StreamsBootstrapValues(HelmAppValues): - """Base value class for all streams bootstrap related components. - - :param image_tag: Docker image tag of the streams-bootstrap app. - """ - - image_tag: str = Field( - default="latest", - pattern=IMAGE_TAG_PATTERN, - description=describe_attr("image_tag", __doc__), - ) - - -class StreamsBootstrap(HelmApp, ABC): - """Base for components with a streams-bootstrap Helm chart. - - :param app: streams-bootstrap app values - :param repo_config: Configuration of the Helm chart repo to be used for - deploying the component, defaults to streams-bootstrap Helm repo - :param version: Helm chart version, defaults to "2.9.0" - """ - - app: StreamsBootstrapValues = Field( - default_factory=StreamsBootstrapValues, - description=describe_attr("app", __doc__), - ) - - repo_config: HelmRepoConfig = Field( - default=STREAMS_BOOTSTRAP_HELM_REPO, - description=describe_attr("repo_config", __doc__), - ) - version: str | None = Field( - default=STREAMS_BOOTSTRAP_VERSION, - description=describe_attr("version", __doc__), - ) - - @pydantic.model_validator(mode="after") - def warning_for_latest_image_tag(self) -> Self: - if self.validate_ and self.app.image_tag == "latest": - log.warning( - f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag." - ) - return self diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index 2dc3b5927..1cbdf495c 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -4,7 +4,7 @@ KafkaAppValues, KafkaStreamsConfig, ) -from kpops.components.streams_bootstrap import StreamsBootstrapValues +from kpops.components.common.streams_bootstrap import StreamsBootstrapValues from kpops.utils.docstring import describe_attr diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index a9c06f37e..9674dd7da 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -12,7 +12,7 @@ OutputTopicTypes, TopicConfig, ) -from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.producer.model import ProducerAppValues from kpops.utils.docstring import describe_attr diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 1bffb84c6..675978396 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -11,7 +11,7 @@ KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.components.streams_bootstrap import StreamsBootstrapValues +from kpops.components.common.streams_bootstrap import StreamsBootstrapValues from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 9bd5d87c5..979c3c4c9 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -5,13 +5,10 @@ from typing_extensions import override from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler -from kpops.components import HelmApp -from kpops.components.base_components.kafka_app import ( - KafkaApp, - KafkaAppCleaner, -) +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner from kpops.components.base_components.models.topic import KafkaTopic -from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.common.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import ( StreamsAppValues, diff --git a/kpops/config.py b/kpops/config/__init__.py similarity index 97% rename from kpops/config.py rename to kpops/config/__init__.py index 7efbbf0b7..5a9f25a3e 100644 --- a/kpops/config.py +++ b/kpops/config/__init__.py @@ -74,10 +74,6 @@ class KafkaConnectConfig(BaseSettings): class KpopsConfig(BaseSettings): """Global configuration for KPOps project.""" - components_module: str | None = Field( - default=None, - description="Custom Python module defining project-specific KPOps components", - ) pipeline_base_dir: Path = Field( default=Path(), description="Base directory to the pipelines (default is current working directory)", diff --git a/kpops/const/__init__.py b/kpops/const/__init__.py new file mode 100644 index 000000000..c55d7127f --- /dev/null +++ b/kpops/const/__init__.py @@ -0,0 +1,3 @@ +__version__ = "6.0.1" +KPOPS = "KPOps" +KPOPS_MODULE = "kpops." diff --git a/kpops/pipeline.py b/kpops/pipeline/__init__.py similarity index 100% rename from kpops/pipeline.py rename to kpops/pipeline/__init__.py diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index 3b4ce5ad3..539e2cf3a 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -20,8 +20,8 @@ ModelFieldsSchema, ) -from kpops.api.registry import _find_classes -from kpops.components import ( +from kpops.api.registry import Registry +from kpops.components.base_components.pipeline_component import ( PipelineComponent, ) from kpops.config import KpopsConfig @@ -32,6 +32,10 @@ class MultiComponentGenerateJsonSchema(GenerateJsonSchema): ... log = logging.getLogger("") +registry = Registry() +registry.discover_components() +COMPONENTS = tuple(registry.components) + def print_schema(model: type[BaseModel]) -> None: schema = model_json_schema(model, by_alias=True) @@ -39,14 +43,13 @@ def print_schema(model: type[BaseModel]) -> None: def _is_valid_component( - defined_component_types: set[str], component: type[PipelineComponent], allow_abstract: bool, ) -> bool: """Check whether a PipelineComponent subclass has a valid definition for the schema generation. - :param defined_component_types: types defined so far :param component: component type to be validated + :param allow_abstract: whether to include abstract components marked as ABC :return: Whether component is valid for schema generation """ if not allow_abstract and ( @@ -54,85 +57,25 @@ def _is_valid_component( ): log.warning(f"SKIPPED {component.__name__}, component is abstract.") return False - if component.type in defined_component_types: - log.warning(f"SKIPPED {component.__name__}, component type must be unique.") - return False - defined_component_types.add(component.type) return True -def _add_components( - components_module: str, - allow_abstract: bool, - components: tuple[type[PipelineComponent], ...] | None = None, -) -> tuple[type[PipelineComponent], ...]: - """Add components to a components tuple. - - If an empty tuple is provided or it is not provided at all, the components - types from the given module are 'tupled' - - :param components_module: Python module. Only the classes that inherit from - PipelineComponent will be considered. - :param components: Tuple of components to which to add, defaults to () - :return: Extended tuple - """ - if components is None: - components = () - # Set of existing types, against which to check the new ones - defined_component_types = {component.type for component in components} - custom_components = ( - component - for component in _find_classes(components_module, PipelineComponent) - if _is_valid_component(defined_component_types, component, allow_abstract) - ) - components += tuple(custom_components) - return components - - -def find_components( - components_module: str | None, - include_stock_components: bool, - include_abstract: bool = False, -) -> tuple[type[PipelineComponent], ...]: - if not (include_stock_components or components_module): - msg = "No components are provided, no schema is generated." - raise RuntimeError(msg) - # Add stock components if enabled - components: tuple[type[PipelineComponent], ...] = () - if include_stock_components: - components = _add_components("kpops.components", include_abstract) - # Add custom components if provided - if components_module: - components = _add_components(components_module, include_abstract, components) - if not components: - msg = "No valid components found." - raise RuntimeError(msg) - return components - - -def gen_pipeline_schema( - components_module: str | None = None, include_stock_components: bool = True -) -> None: - """Generate a json schema from the models of pipeline components. - - :param components_module: Python module. Only the classes that inherit from - PipelineComponent will be considered., defaults to None - :param include_stock_components: Whether to include the stock components, - defaults to True - """ - components = find_components(components_module, include_stock_components) - +def gen_pipeline_schema() -> None: + """Generate a JSON schema from the models of pipeline components.""" + components = [ + component for component in COMPONENTS if _is_valid_component(component, False) + ] # re-assign component type as Literal to work as discriminator for component in components: component.model_fields["type"] = FieldInfo( - annotation=Literal[component.type], # type:ignore[valid-type] + annotation=Literal[component.type], # type: ignore[valid-type] default=component.type, ) - core_schema: DefinitionsSchema = component.__pydantic_core_schema__ # pyright:ignore[reportAssignmentType] + core_schema: DefinitionsSchema = component.__pydantic_core_schema__ # pyright: ignore[reportAssignmentType] schema = core_schema while "schema" in schema: schema = schema["schema"] - model_schema: ModelFieldsSchema = schema # pyright:ignore[reportAssignmentType] + model_schema: ModelFieldsSchema = schema # pyright: ignore[reportAssignmentType] model_schema["fields"]["type"] = ModelField( type="model-field", schema=LiteralSchema( @@ -141,23 +84,23 @@ def gen_pipeline_schema( ), ) - PipelineComponents = Union[components] # type: ignore[valid-type] + PipelineComponents = Union[tuple(components)] # pyright: ignore[reportInvalidTypeArguments,reportGeneralTypeIssues] AnnotatedPipelineComponents = Annotated[ PipelineComponents, Field(discriminator="type") ] class PipelineSchema(RootModel): root: Sequence[ - AnnotatedPipelineComponents # pyright:ignore[reportInvalidTypeForm] + AnnotatedPipelineComponents # pyright: ignore[reportInvalidTypeForm] ] print_schema(PipelineSchema) -def gen_defaults_schema( - components_module: str | None = None, include_stock_components: bool = True -) -> None: - components = find_components(components_module, include_stock_components, True) +def gen_defaults_schema() -> None: + components = [ + component for component in COMPONENTS if _is_valid_component(component, True) + ] components_mapping: dict[str, Any] = { component.type: (component, ...) for component in components } diff --git a/pyproject.toml b/pyproject.toml index f84f7a43e..cd831c71b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ mkdocs-exclude-search = "^0.6.5" mike = "^1.1.2" mkdocstrings = { extras = ["python"], version = "^0.25.1" } -[tool.poetry_bumpversion.file."kpops/__init__.py"] +[tool.poetry_bumpversion.file."kpops/const/__init__.py"] [tool.pyright] reportUnknownParameterType = "warning" diff --git a/tests/api/test_handlers.py b/tests/api/test_handlers.py index c3a92784d..beb2b9dcc 100644 --- a/tests/api/test_handlers.py +++ b/tests/api/test_handlers.py @@ -16,10 +16,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): - config = KpopsConfig( - kafka_brokers="broker:9092", - components_module=MODULE, - ) + config = KpopsConfig(kafka_brokers="broker:9092") connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler diff --git a/tests/api/test_registry.py b/tests/api/test_registry.py index 00daee08d..ed496e0c8 100644 --- a/tests/api/test_registry.py +++ b/tests/api/test_registry.py @@ -1,11 +1,26 @@ from __future__ import annotations +import importlib +from pathlib import Path +from types import ModuleType + import pytest from kpops.api.exception import ClassNotFoundError -from kpops.api.registry import Registry, _find_classes, find_class +from kpops.api.registry import Registry, _find_classes, _iter_namespace, find_class from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.base_components.kafka_app import KafkaApp +from kpops.components.base_components.kafka_connector import ( + KafkaConnector, + KafkaSinkConnector, + KafkaSourceConnector, +) +from kpops.components.base_components.kubernetes_app import KubernetesApp from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.common.streams_bootstrap import StreamsBootstrap +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp from tests.cli.resources.custom_module import CustomSchemaProvider @@ -22,51 +37,72 @@ class Unrelated: MODULE = SubComponent.__module__ -def test_find_classes(): - gen = _find_classes(MODULE, PipelineComponent) - assert next(gen) is SubComponent - assert next(gen) is SubSubComponent - with pytest.raises(StopIteration): - next(gen) +def test_namespace(): + """Ensure namespace package according to PEP 420.""" + assert not Path("kpops/__init__.py").exists() + assert not Path("kpops/components/__init__.py").exists() -def test_find_builtin_classes(): - components = [ - class_.__name__ - for class_ in _find_classes("kpops.components", PipelineComponent) +@pytest.mark.usefixtures("custom_components") +def test_iter_namespace(): + components_module = importlib.import_module("kpops.components") + assert [module.__name__ for module in _iter_namespace(components_module)] == [ + "kpops.components.base_components", + "kpops.components.common", + "kpops.components.streams_bootstrap", + "kpops.components.test_components", ] - assert len(components) == 10 - assert components == [ - "HelmApp", - "KafkaApp", - "KafkaConnector", - "KafkaSinkConnector", - "KafkaSourceConnector", - "KubernetesApp", - "PipelineComponent", - "ProducerApp", - "StreamsApp", - "StreamsBootstrap", + + +@pytest.mark.usefixtures("custom_components") +def test_iter_component_modules(): + assert [module.__name__ for module in Registry.iter_component_modules()] == [ + "kpops.components", + "kpops.components.base_components", + "kpops.components.common", + "kpops.components.streams_bootstrap", + "kpops.components.test_components", ] -def test_find_class(): - assert find_class(MODULE, SubComponent) is SubComponent - assert find_class(MODULE, PipelineComponent) is SubComponent - assert find_class(MODULE, SchemaProvider) is CustomSchemaProvider +@pytest.fixture() +def module() -> ModuleType: + return importlib.import_module(MODULE) + + +def test_find_classes(module: ModuleType): + gen = _find_classes([module], PipelineComponent) + assert next(gen) is SubComponent + assert next(gen) is SubSubComponent + with pytest.raises(StopIteration): + next(gen) + + +def test_find_class(module: ModuleType): + assert find_class([module], base=SubComponent) is SubComponent + assert find_class([module], base=PipelineComponent) is SubComponent + assert find_class([module], base=SchemaProvider) is CustomSchemaProvider with pytest.raises(ClassNotFoundError): - find_class(MODULE, dict) + find_class([module], base=dict) def test_registry(): registry = Registry() assert registry._classes == {} - registry.find_components(MODULE) + registry.discover_components() assert registry._classes == { - "sub-component": SubComponent, - "sub-sub-component": SubSubComponent, + "helm-app": HelmApp, + "kafka-app": KafkaApp, + "kafka-connector": KafkaConnector, + "kafka-sink-connector": KafkaSinkConnector, + "kafka-source-connector": KafkaSourceConnector, + "kubernetes-app": KubernetesApp, + "pipeline-component": PipelineComponent, + "producer-app": ProducerApp, + "streams-app": StreamsApp, + "streams-bootstrap": StreamsBootstrap, } - assert registry["sub-component"] is SubComponent - assert registry["sub-sub-component"] is SubSubComponent + for _type, _class in registry._classes.items(): + assert registry[_type] is _class with pytest.raises(ClassNotFoundError): registry["doesnt-exist"] diff --git a/tests/cli/resources/config.yaml b/tests/cli/resources/config.yaml index 046c98d2a..79261856b 100644 --- a/tests/cli/resources/config.yaml +++ b/tests/cli/resources/config.yaml @@ -1,2 +1 @@ kafka_brokers: http://127.0.0.1:9092 -components_module: tests.cli.test_schema_generation diff --git a/tests/cli/resources/empty_module/config.yaml b/tests/cli/resources/empty_module/config.yaml deleted file mode 100644 index 735b3904a..000000000 --- a/tests/cli/resources/empty_module/config.yaml +++ /dev/null @@ -1,2 +0,0 @@ -kafka_brokers: http://127.0.0.1:9092 -components_module: tests.cli.resources.empty_module diff --git a/tests/cli/resources/no_module/config.yaml b/tests/cli/resources/no_module/config.yaml deleted file mode 100644 index 79261856b..000000000 --- a/tests/cli/resources/no_module/config.yaml +++ /dev/null @@ -1 +0,0 @@ -kafka_brokers: http://127.0.0.1:9092 diff --git a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml index 5c251a22c..3c86a269a 100644 --- a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml +++ b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml @@ -4,7 +4,6 @@ kafka_brokers: null # Non-required fields -components_module: null create_namespace: false helm_config: api_version: null diff --git a/tests/cli/test_init.py b/tests/cli/test_init.py index 8d4790d23..3109d16fc 100644 --- a/tests/cli/test_init.py +++ b/tests/cli/test_init.py @@ -3,7 +3,7 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops from kpops.cli.main import app from kpops.utils.cli_commands import create_config diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 0741340e0..81f975b4c 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -3,19 +3,15 @@ import json from abc import ABC, abstractmethod from pathlib import Path -from typing import TYPE_CHECKING import pytest from pydantic import ConfigDict, Field from typer.testing import CliRunner -from kpops.api.registry import Registry from kpops.cli.main import app -from kpops.components import PipelineComponent +from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.docstring import describe_attr - -if TYPE_CHECKING: - from pytest_snapshot.plugin import Snapshot +from kpops.utils.gen_schema import COMPONENTS RESOURCE_PATH = Path(__file__).parent / "resources" @@ -80,92 +76,6 @@ class SubPipelineComponentCorrectDocstr(SubPipelineComponent): "ignore:handlers", "ignore:config", "ignore:enrich", "ignore:validate" ) class TestGenSchema: - @pytest.fixture - def stock_components(self) -> list[type[PipelineComponent]]: - registry = Registry() - registry.find_components("kpops.components") - return list(registry._classes.values()) - - def test_gen_pipeline_schema_no_modules(self): - with pytest.raises( - RuntimeError, match="^No components are provided, no schema is generated.$" - ): - runner.invoke( - app, - [ - "schema", - "pipeline", - "--no-include-stock-components", - "--config", - str(RESOURCE_PATH / "no_module"), - ], - catch_exceptions=False, - ) - - def test_gen_pipeline_schema_no_components(self): - with pytest.raises(RuntimeError, match="^No valid components found.$"): - runner.invoke( - app, - [ - "schema", - "pipeline", - "--no-include-stock-components", - "--config", - str(RESOURCE_PATH / "empty_module"), - ], - catch_exceptions=False, - ) - - def test_gen_pipeline_schema_only_stock_module(self): - result = runner.invoke( - app, - [ - "schema", - "pipeline", - ], - catch_exceptions=False, - ) - - assert result.exit_code == 0, result.stdout - assert result.stdout - - result = runner.invoke( - app, - [ - "schema", - "pipeline", - "--include-stock-components", - ], - catch_exceptions=False, - ) - - assert result.exit_code == 0, result.stdout - assert result.stdout - - def test_gen_pipeline_schema_only_custom_module( - self, snapshot: Snapshot, stock_components: list[type[PipelineComponent]] - ): - result = runner.invoke( - app, - [ - "schema", - "pipeline", - "--no-include-stock-components", - "--config", - str(RESOURCE_PATH), - ], - catch_exceptions=False, - ) - - assert result.exit_code == 0, result.stdout - - snapshot.assert_match(result.stdout, "schema.json") - schema = json.loads(result.stdout) - assert schema["title"] == "PipelineSchema" - assert set(schema["items"]["discriminator"]["mapping"].keys()).isdisjoint( - {component.type for component in stock_components} - ) - def test_gen_pipeline_schema_stock_and_custom_module(self): result = runner.invoke( app, @@ -179,14 +89,12 @@ def test_gen_pipeline_schema_stock_and_custom_module(self): assert result.exit_code == 0, result.stdout assert result.stdout - def test_gen_defaults_schema(self, stock_components: list[type[PipelineComponent]]): + def test_gen_defaults_schema(self): result = runner.invoke( app, [ "schema", "defaults", - "--config", - str(RESOURCE_PATH / "no_module"), ], catch_exceptions=False, ) @@ -195,7 +103,7 @@ def test_gen_defaults_schema(self, stock_components: list[type[PipelineComponent assert result.stdout schema = json.loads(result.stdout) assert schema["title"] == "DefaultsSchema" - assert schema["required"] == [component.type for component in stock_components] + assert schema["required"] == [component.type for component in COMPONENTS] def test_gen_config_schema(self): result = runner.invoke( diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index 8d4052e54..716f2f482 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from pydantic import AnyHttpUrl, BaseModel, TypeAdapter +from pydantic import AnyHttpUrl, TypeAdapter from pytest_mock import MockerFixture from schema_registry.client.schema import AvroSchema from schema_registry.client.utils import SchemaVersion @@ -20,10 +20,6 @@ from kpops.utils.colorify import greenify, magentaify, yellowify from tests.pipeline.test_components import TestSchemaProvider -NON_EXISTING_PROVIDER_MODULE = BaseModel.__module__ -TEST_SCHEMA_PROVIDER_MODULE = TestSchemaProvider.__module__ - - log = logging.getLogger("SchemaHandler") @@ -48,13 +44,6 @@ def log_warning_mock(mocker: MockerFixture) -> MagicMock: ) -@pytest.fixture(autouse=False) -def find_class_mock(mocker: MockerFixture) -> MagicMock: - return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.find_class" - ) - - @pytest.fixture(autouse=True) def schema_registry_mock(mocker: MockerFixture) -> AsyncMock: schema_registry_mock_constructor = mocker.patch( @@ -87,15 +76,11 @@ def kpops_config() -> KpopsConfig: enabled=True, url=TypeAdapter(AnyHttpUrl).validate_python("http://mock:8081"), # pyright: ignore[reportCallIssue,reportArgumentType] ), - components_module=TEST_SCHEMA_PROVIDER_MODULE, ) def test_load_schema_handler(kpops_config: KpopsConfig): - assert isinstance( - SchemaHandler.load_schema_handler(kpops_config), - SchemaHandler, - ) + assert isinstance(SchemaHandler.load_schema_handler(kpops_config), SchemaHandler) config_disable = kpops_config.model_copy() config_disable.schema_registry = SchemaRegistryConfig(enabled=False) @@ -103,9 +88,8 @@ def test_load_schema_handler(kpops_config: KpopsConfig): assert SchemaHandler.load_schema_handler(config_disable) is None -def test_should_lazy_load_schema_provider( - find_class_mock: MagicMock, kpops_config: KpopsConfig -): +@pytest.mark.usefixtures("custom_components") +def test_should_lazy_load_schema_provider(kpops_config: KpopsConfig): schema_handler = SchemaHandler.load_schema_handler(kpops_config) assert schema_handler is not None @@ -117,18 +101,17 @@ def test_should_lazy_load_schema_provider( "com.bakdata.kpops.test.SomeOtherSchemaClass", {} ) - find_class_mock.assert_called_once_with(TEST_SCHEMA_PROVIDER_MODULE, SchemaProvider) + assert isinstance(schema_handler.schema_provider, TestSchemaProvider) def test_should_raise_value_error_if_schema_provider_class_not_found( kpops_config: KpopsConfig, ): - kpops_config.components_module = NON_EXISTING_PROVIDER_MODULE schema_handler = SchemaHandler(kpops_config) with pytest.raises( ValueError, - match="No schema provider found in components module pydantic.main. " + match="No schema provider found. " "Please implement the abstract method in " f"{SchemaProvider.__module__}.{SchemaProvider.__name__}.", ): @@ -137,35 +120,8 @@ def test_should_raise_value_error_if_schema_provider_class_not_found( ) -@pytest.mark.parametrize( - ("components_module"), - [ - pytest.param( - None, - id="components_module = None", - ), - pytest.param( - "", - id="components_module = ''", - ), - ], -) -def test_should_raise_value_error_when_schema_provider_is_called_and_components_module_is_empty( - kpops_config: KpopsConfig, components_module: str | None -): - kpops_config.components_module = components_module - schema_handler = SchemaHandler.load_schema_handler(kpops_config) - assert schema_handler is not None - with pytest.raises( - ValueError, - match="The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your SchemaProvider implementation exists.", - ): - schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} - ) - - @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( to_section: ToSection, log_info_mock: MagicMock, @@ -185,6 +141,7 @@ async def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_t @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( topic_config: TopicConfig, to_section: ToSection, @@ -207,6 +164,7 @@ async def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatible_and_dry_run_true( topic_config: TopicConfig, to_section: ToSection, @@ -244,6 +202,7 @@ async def test_should_raise_exception_when_submit_schema_that_exists_and_not_com @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_version_and_dry_run_true( topic_config: TopicConfig, to_section: ToSection, @@ -279,6 +238,7 @@ async def test_should_log_debug_when_submit_schema_that_exists_and_registered_un @pytest.mark.asyncio() +@pytest.mark.usefixtures("custom_components") async def test_should_submit_non_existing_schema_when_not_dry( topic_config: TopicConfig, to_section: ToSection, diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index ab21ef68e..f8fe12eec 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -6,7 +6,7 @@ import pydantic import pytest -from kpops.api.file_type import KpopsFileType +from kpops.api.file_type import DEFAULTS_YAML, PIPELINE_YAML, KpopsFileType from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.base_defaults_component import ( BaseDefaultsComponent, @@ -17,10 +17,6 @@ from kpops.utils.environment import ENV from tests.components import PIPELINE_BASE_DIR, RESOURCES_PATH -PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() - -DEFAULTS_YAML = KpopsFileType.DEFAULTS.as_yaml_file() - class Parent(BaseDefaultsComponent): __test__ = False diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 1b98e4ac4..bf5cbb6b9 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -12,8 +12,10 @@ KafkaConnectorConfig, KafkaConnectorType, ) -from kpops.components import KafkaSinkConnector -from kpops.components.base_components.kafka_connector import KafkaConnectorResetter +from kpops.components.base_components.kafka_connector import ( + KafkaConnectorResetter, + KafkaSinkConnector, +) from kpops.components.base_components.models.from_section import ( FromSection, FromTopic, diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 4f7184ead..346913e57 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -7,13 +7,15 @@ from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components import ProducerApp from kpops.components.base_components.models.topic import ( KafkaTopic, OutputTopicTypes, TopicConfig, ) -from kpops.components.streams_bootstrap.producer.producer_app import ProducerAppCleaner +from kpops.components.streams_bootstrap.producer.producer_app import ( + ProducerApp, + ProducerAppCleaner, +) from kpops.config import KpopsConfig, TopicNameConfig from tests.components import PIPELINE_BASE_DIR diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index cb340174a..a72328d5e 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -12,7 +12,6 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components import StreamsApp from kpops.components.base_components.models import TopicName from kpops.components.base_components.models.to_section import ( ToSection, @@ -27,6 +26,7 @@ StreamsAppAutoScaling, ) from kpops.components.streams_bootstrap.streams.streams_app import ( + StreamsApp, StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig diff --git a/tests/components/test_streams_bootstrap.py b/tests/components/test_streams_bootstrap.py index 3ece4612d..6084c3ea3 100644 --- a/tests/components/test_streams_bootstrap.py +++ b/tests/components/test_streams_bootstrap.py @@ -12,7 +12,10 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components.streams_bootstrap import StreamsBootstrap, StreamsBootstrapValues +from kpops.components.common.streams_bootstrap import ( + StreamsBootstrap, + StreamsBootstrapValues, +) from kpops.config import KpopsConfig from tests.components import PIPELINE_BASE_DIR diff --git a/tests/conftest.py b/tests/conftest.py index 5fb77a415..0e23be87c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,8 @@ import logging import os +import shutil from collections.abc import Iterator +from pathlib import Path from unittest import mock import pytest @@ -38,3 +40,14 @@ def mock_env() -> Iterator[Environment]: def load_yaml_file_clear_cache() -> Iterator[None]: yield load_yaml_file.cache.clear() # pyright: ignore[reportFunctionMemberAccess] + + +@pytest.fixture() +def custom_components(): + src = Path("tests/pipeline/test_components") + dst = Path("kpops/components/test_components") + try: + shutil.copytree(src, dst) + yield + finally: + shutil.rmtree(dst) diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 36f24f938..4502ea69e 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -5,17 +5,15 @@ Schema, SchemaProvider, ) -from kpops.components import ( - KafkaSinkConnector, - PipelineComponent, - ProducerApp, - StreamsApp, -) +from kpops.components.base_components.kafka_connector import KafkaSinkConnector from kpops.components.base_components.models import ModelName, ModelVersion, TopicName from kpops.components.base_components.models.to_section import ( ToSection, ) from kpops.components.base_components.models.topic import OutputTopicTypes, TopicConfig +from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp class ScheduledProducer(ProducerApp): ... diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index 1d54a9f7a..9646e569b 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -1,13 +1,11 @@ from typing_extensions import override from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig -from kpops.components import ( - KafkaSinkConnector, - PipelineComponent, - ProducerApp, - StreamsApp, -) +from kpops.components.base_components.kafka_connector import KafkaSinkConnector from kpops.components.base_components.models.topic import OutputTopicTypes +from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp class ScheduledProducer(ProducerApp): ... diff --git a/tests/pipeline/test_example.py b/tests/pipeline/test_example.py index 1101b5795..7a03ae2ee 100644 --- a/tests/pipeline/test_example.py +++ b/tests/pipeline/test_example.py @@ -5,7 +5,7 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops runner = CliRunner() diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index ef68aadf9..1d2fce41f 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -9,20 +9,19 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops from kpops.api.exception import ParsingException, ValidationError -from kpops.api.file_type import KpopsFileType +from kpops.api.file_type import PIPELINE_YAML, KpopsFileType from kpops.cli.main import FilterType, app -from kpops.components import KafkaSinkConnector, PipelineComponent - -PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() +from kpops.components.base_components.kafka_connector import KafkaSinkConnector +from kpops.components.base_components.pipeline_component import PipelineComponent runner = CliRunner() RESOURCE_PATH = Path(__file__).parent / "resources" -@pytest.mark.usefixtures("mock_env", "load_yaml_file_clear_cache") +@pytest.mark.usefixtures("mock_env", "load_yaml_file_clear_cache", "custom_components") class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index 445e528cc..45f3e3a94 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -7,7 +7,7 @@ from pytest_snapshot.plugin import Snapshot from typer.testing import CliRunner -import kpops +import kpops.api as kpops from kpops.cli.main import app from kpops.component_handlers.helm_wrapper.helm import Helm from kpops.component_handlers.helm_wrapper.model import HelmConfig, Version diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index b5c741ad0..61d338dcf 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -7,9 +7,9 @@ from kpops.component_handlers import ( ComponentHandlers, ) -from kpops.components import PipelineComponent from kpops.components.base_components.models.from_section import FromSection from kpops.components.base_components.models.to_section import ToSection +from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.pipeline import Pipeline PREFIX = "example-prefix-" From bcef0e5099e66e272952df4f310a9ef6eeceb784 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 15 Jul 2024 11:52:44 +0200 Subject: [PATCH 2/2] Call destroy from inside of reset or clean (#501) --- docs/docs/user/migration-guide/v6-v7.md | 37 +++++ kpops/api/__init__.py | 2 - .../base_components/kafka_connector.py | 8 ++ .../base_components/pipeline_component.py | 2 + .../producer/producer_app.py | 6 + .../streams_bootstrap/streams/streams_app.py | 14 +- tests/components/test_helm_app.py | 36 +++++ tests/components/test_kafka_sink_connector.py | 11 ++ .../components/test_kafka_source_connector.py | 22 ++- tests/components/test_producer_app.py | 32 ++++- tests/components/test_streams_app.py | 83 +++++++++-- tests/components/test_streams_bootstrap.py | 129 ------------------ tests/pipeline/test_clean.py | 52 ++++--- tests/pipeline/test_reset.py | 50 ++++--- 14 files changed, 299 insertions(+), 185 deletions(-) delete mode 100644 tests/components/test_streams_bootstrap.py diff --git a/docs/docs/user/migration-guide/v6-v7.md b/docs/docs/user/migration-guide/v6-v7.md index 53c83c796..c08be2af8 100644 --- a/docs/docs/user/migration-guide/v6-v7.md +++ b/docs/docs/user/migration-guide/v6-v7.md @@ -60,3 +60,40 @@ As a result of the restructure, some imports need to be adjusted: -components/__init__.py +kpops/components/custom/__init__.py ``` + +## [Call destroy from inside of reset or clean](https://github.com/bakdata/kpops/pull/501) + +Before v7, the KPOps CLI executed `destroy` before running `reset/clean` to ensure the component was destroyed. + +This logic has changed. The `destroy` method is now called within the `PipelineComponent`'s `reset`/`clean`. + +During migrating to v7, you should check your custom components and see if they override the `reset`/`clean` methods. If so, you need to call the supermethod `reset`/`clean` to trigger the `destroy` inside the parent class. Alternatively, if you are implementing the `PipelineComponent` class, you need to call the `destroy` method at the beginning of the method. + +#### components.py + +For example, when creating a custom `StreamsApp` or `ProducerApp` (or any other custom component), you **must** call the supermethod `reset`/`clean` to execute the `destroy` in the parent class. **Otherwise, the logic of destroy will not be executed!** + +````diff +class MyStreamsApp(StreamsApp): + + @override + async def clean(self, dry_run: bool) -> None: ++ await super().clean(dry_run) + # Some custom clean logic + # ... + ```diff + + +class MyCustomComponent(PipelineComponent): + + @override + async def destroy(self, dry_run: bool) -> None: + # Some custom destroy logic + # ... + + @override + async def clean(self, dry_run: bool) -> None: ++ await super().clean(dry_run) + # Some custom clean logic + # ... +```` diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 783fb3d57..f376af704 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -237,7 +237,6 @@ def reset( ) async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) log_action("Reset", component) await component.reset(dry_run) @@ -286,7 +285,6 @@ def clean( ) async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) log_action("Clean", component) await component.clean(dry_run) diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 1d9a296b3..3932fa3ef 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -166,6 +166,7 @@ def _resetter(self) -> KafkaConnectorResetter: @override async def deploy(self, dry_run: bool) -> None: + """Deploy Kafka Connector (Source/Sink). Create output topics and register schemas if configured.""" if self.to: for topic in self.to.kafka_topics: await self.handlers.topic_handler.create_topic(topic, dry_run=dry_run) @@ -181,12 +182,15 @@ async def deploy(self, dry_run: bool) -> None: @override async def destroy(self, dry_run: bool) -> None: + """Delete Kafka Connector (Source/Sink) from the Kafka connect cluster.""" await self.handlers.connector_handler.destroy_connector( self.full_name, dry_run=dry_run ) @override async def clean(self, dry_run: bool) -> None: + """Delete Kafka Connector. If schema handler is enabled, then remove schemas. Delete all the output topics.""" + await super().clean(dry_run) if self.to: if self.handlers.schema_handler: await self.handlers.schema_handler.delete_schemas( @@ -229,10 +233,12 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override async def reset(self, dry_run: bool) -> None: + """Reset state. Keep connector.""" await self._resetter.reset(dry_run) @override async def clean(self, dry_run: bool) -> None: + """Delete connector and reset state.""" await super().clean(dry_run) await self._resetter.clean(dry_run) @@ -266,11 +272,13 @@ def set_error_topic(self, topic: KafkaTopic) -> None: @override async def reset(self, dry_run: bool) -> None: + """Reset state. Keep consumer group and connector.""" self._resetter.app.config.delete_consumer_group = False await self._resetter.reset(dry_run) @override async def clean(self, dry_run: bool) -> None: + """Delete connector and consumer group.""" await super().clean(dry_run) self._resetter.app.config.delete_consumer_group = True await self._resetter.clean(dry_run) diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 7ce114899..ec49bce01 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None: :param dry_run: Whether to do a dry run of the command """ + await self.destroy(dry_run) async def clean(self, dry_run: bool) -> None: """Destroy component including related states. :param dry_run: Whether to do a dry run of the command """ + await self.destroy(dry_run) diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 9674dd7da..b5ab0136e 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -93,6 +93,12 @@ def add_extra_output_topic(self, topic: KafkaTopic, role: str) -> None: def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.PRODUCER_APP.value}" + async def reset(self, dry_run: bool) -> None: + """Reset not necessary, since producer app has no consumer group offsets.""" + await super().reset(dry_run) + @override async def clean(self, dry_run: bool) -> None: + """Destroy and clean.""" + await super().clean(dry_run) await self._cleaner.clean(dry_run) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 979c3c4c9..1c949a528 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -28,8 +28,14 @@ class StreamsAppCleaner(KafkaAppCleaner): def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" + @override + async def reset(self, dry_run: bool) -> None: + self.app.streams.delete_output = False + await super().clean(dry_run) + @override async def clean(self, dry_run: bool) -> None: + self.app.streams.delete_output = True await super().clean(dry_run) if self.app.stateful_set and self.app.persistence.enabled: await self.clean_pvcs(dry_run) @@ -120,10 +126,12 @@ def helm_chart(self) -> str: @override async def reset(self, dry_run: bool) -> None: - self._cleaner.app.streams.delete_output = False - await self._cleaner.clean(dry_run) + """Destroy and reset.""" + await super().reset(dry_run) + await self._cleaner.reset(dry_run) @override async def clean(self, dry_run: bool) -> None: - self._cleaner.app.streams.delete_output = True + """Destroy and clean.""" + await super().clean(dry_run) await self._cleaner.clean(dry_run) diff --git a/tests/components/test_helm_app.py b/tests/components/test_helm_app.py index cd9a78b66..13c55dc78 100644 --- a/tests/components/test_helm_app.py +++ b/tests/components/test_helm_app.py @@ -224,6 +224,42 @@ async def test_should_call_helm_uninstall_when_destroying_helm_app( log_info_mock.assert_called_once_with(magentaify(stdout)) + @pytest.mark.asyncio() + async def test_should_call_helm_uninstall_when_resetting_helm_app( + self, + helm_app: HelmApp, + helm_mock: MagicMock, + log_info_mock: MagicMock, + ): + stdout = 'HelmApp - release "test-helm-app" uninstalled' + helm_mock.uninstall.return_value = stdout + + await helm_app.reset(True) + + helm_mock.uninstall.assert_called_once_with( + "test-namespace", "${pipeline.name}-test-helm-app", True + ) + + log_info_mock.assert_called_once_with(magentaify(stdout)) + + @pytest.mark.asyncio() + async def test_should_call_helm_uninstall_when_cleaning_helm_app( + self, + helm_app: HelmApp, + helm_mock: MagicMock, + log_info_mock: MagicMock, + ): + stdout = 'HelmApp - release "test-helm-app" uninstalled' + helm_mock.uninstall.return_value = stdout + + await helm_app.clean(True) + + helm_mock.uninstall.assert_called_once_with( + "test-namespace", "${pipeline.name}-test-helm-app", True + ) + + log_info_mock.assert_called_once_with(magentaify(stdout)) + def test_helm_name_override( self, config: KpopsConfig, diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index bf5cbb6b9..5594f44a2 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -245,6 +245,7 @@ async def test_reset_when_dry_run_is_false( helm_mock: MagicMock, mocker: MockerFixture, ): + mock_destroy = mocker.patch.object(connector, "destroy") mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -254,12 +255,14 @@ async def test_reset_when_dry_run_is_false( mock_resetter_reset = mocker.spy(connector._resetter, "reset") mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") dry_run = False await connector.reset(dry_run=dry_run) + mock_destroy.assert_not_called() mock_resetter_reset.assert_called_once_with(dry_run) mock.assert_has_calls( @@ -329,6 +332,8 @@ async def test_clean_when_dry_run_is_false( dry_run_handler_mock: MagicMock, mocker: MockerFixture, ): + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -337,6 +342,7 @@ async def test_clean_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -360,6 +366,7 @@ async def test_clean_when_dry_run_is_false( assert connector.to assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), *( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics @@ -445,6 +452,8 @@ async def test_clean_without_to_when_dry_run_is_false( resetter_namespace=RESETTER_NAMESPACE, ) + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -452,6 +461,7 @@ async def test_clean_without_to_when_dry_run_is_false( connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -460,6 +470,7 @@ async def test_clean_without_to_when_dry_run_is_false( await connector.clean(dry_run) assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 46aea7645..d6127e9fc 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false( mocker: MockerFixture, ): assert connector.handlers.connector_handler + + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -174,10 +177,13 @@ async def test_reset_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") - await connector.reset(dry_run=False) + dry_run = False + await connector.reset(dry_run) + mock_destroy.assert_not_called() assert mock.mock_calls == [ mocker.call.helm.add_repo( @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false( mocker.call.helm.uninstall( RESETTER_NAMESPACE, CONNECTOR_CLEAN_RELEASE_NAME, - False, + dry_run, ), ANY, # __bool__ ANY, # __str__ mocker.call.helm.upgrade_install( CONNECTOR_CLEAN_RELEASE_NAME, "bakdata-kafka-connect-resetter/kafka-connect-resetter", - False, + dry_run, RESETTER_NAMESPACE, { "connectorType": CONNECTOR_TYPE, @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false( mocker.call.helm.uninstall( RESETTER_NAMESPACE, CONNECTOR_CLEAN_RELEASE_NAME, - False, + dry_run, ), ANY, # __bool__ ANY, # __str__ @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false( ): assert connector.handlers.connector_handler + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false( assert connector.to assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), *( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false( assert connector.handlers.connector_handler + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false( await connector.clean(dry_run) assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 346913e57..a6b70daf5 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -200,11 +200,17 @@ async def test_destroy( ) @pytest.mark.asyncio() - async def test_should_not_reset_producer_app( + async def test_should_clean_producer_app( self, producer_app: ProducerApp, mocker: MockerFixture, ): + # actual component + mock_helm_uninstall_producer_app = mocker.patch.object( + producer_app.helm, "uninstall" + ) + + # cleaner mock_helm_upgrade_install = mocker.patch.object( producer_app._cleaner.helm, "upgrade_install" ) @@ -216,14 +222,22 @@ async def test_should_not_reset_producer_app( ) mock = mocker.MagicMock() - mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") + mock.attach_mock( + mock_helm_uninstall_producer_app, "helm_uninstall_producer_app" + ) mock.attach_mock(mock_helm_uninstall, "helm_uninstall") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_print_helm_diff, "print_helm_diff") await producer_app.clean(dry_run=True) mock.assert_has_calls( [ + mocker.call.helm_uninstall_producer_app( + "test-namespace", PRODUCER_APP_RELEASE_NAME, True + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", PRODUCER_APP_CLEAN_RELEASE_NAME, @@ -266,6 +280,12 @@ async def test_should_not_reset_producer_app( async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false( self, mocker: MockerFixture, producer_app: ProducerApp ): + # actual component + mock_helm_uninstall_producer_app = mocker.patch.object( + producer_app.helm, "uninstall" + ) + + # cleaner mock_helm_upgrade_install = mocker.patch.object( producer_app._cleaner.helm, "upgrade_install" ) @@ -274,6 +294,9 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea ) mock = mocker.MagicMock() + mock.attach_mock( + mock_helm_uninstall_producer_app, "helm_uninstall_producer_app" + ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -281,6 +304,11 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea mock.assert_has_calls( [ + mocker.call.helm_uninstall_producer_app( + "test-namespace", PRODUCER_APP_RELEASE_NAME, False + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", PRODUCER_APP_CLEAN_RELEASE_NAME, diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index a72328d5e..a789412e9 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -77,7 +77,9 @@ def config(self) -> KpopsConfig: @pytest.fixture() def streams_app( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ) -> StreamsApp: return StreamsApp( name=STREAMS_APP_NAME, @@ -100,7 +102,9 @@ def streams_app( @pytest.fixture() def stateful_streams_app( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ) -> StreamsApp: return StreamsApp( name=STREAMS_APP_NAME, @@ -196,7 +200,11 @@ def test_cleaner_helm_name_override(self, streams_app: StreamsApp): == STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE ) - def test_set_topics(self, config: KpopsConfig, handlers: ComponentHandlers): + def test_set_topics( + self, + config: KpopsConfig, + handlers: ComponentHandlers, + ): streams_app = StreamsApp( name=STREAMS_APP_NAME, config=config, @@ -245,7 +253,9 @@ def test_set_topics(self, config: KpopsConfig, handlers: ComponentHandlers): assert "extraInputPatterns" in streams_config def test_no_empty_input_topic( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name=STREAMS_APP_NAME, @@ -275,7 +285,11 @@ def test_no_empty_input_topic( assert "inputPattern" in streams_config assert "extraInputPatterns" not in streams_config - def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers): + def test_should_validate( + self, + config: KpopsConfig, + handlers: ComponentHandlers, + ): # An exception should be raised when both role and type are defined and type is input with pytest.raises( ValueError, match="Define role only if `type` is `pattern` or `None`" @@ -325,7 +339,9 @@ def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers) ) def test_set_streams_output_from_to( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name=STREAMS_APP_NAME, @@ -368,7 +384,9 @@ def test_set_streams_output_from_to( ) def test_weave_inputs_from_prev_component( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name=STREAMS_APP_NAME, @@ -452,7 +470,7 @@ async def test_deploy_order_when_dry_run_is_false( mock = mocker.AsyncMock() mock.attach_mock(mock_create_topic, "mock_create_topic") - mock.attach_mock(mock_helm_upgrade_install, "mock_helm_upgrade_install") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") dry_run = False await streams_app.deploy(dry_run=dry_run) @@ -493,7 +511,7 @@ async def test_deploy_order_when_dry_run_is_false( mocker.call.mock_create_topic(topic, dry_run=dry_run) for topic in streams_app.to.kafka_topics ), - mocker.call.mock_helm_upgrade_install( + mocker.call.helm_upgrade_install( STREAMS_APP_RELEASE_NAME, "bakdata-streams-bootstrap/streams-app", dry_run, @@ -526,7 +544,11 @@ async def test_deploy_order_when_dry_run_is_false( ] @pytest.mark.asyncio() - async def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): + async def test_destroy( + self, + streams_app: StreamsApp, + mocker: MockerFixture, + ): mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") await streams_app.destroy(dry_run=True) @@ -539,6 +561,11 @@ async def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): async def test_reset_when_dry_run_is_false( self, streams_app: StreamsApp, mocker: MockerFixture ): + # actual component + mock_helm_uninstall_streams_app = mocker.patch.object( + streams_app.helm, "uninstall" + ) + cleaner = streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) @@ -546,6 +573,9 @@ async def test_reset_when_dry_run_is_false( mock_helm_uninstall = mocker.patch.object(cleaner.helm, "uninstall") mock = mocker.MagicMock() + mock.attach_mock( + mock_helm_uninstall_streams_app, "mock_helm_uninstall_streams_app" + ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -554,6 +584,11 @@ async def test_reset_when_dry_run_is_false( mock.assert_has_calls( [ + mocker.call.mock_helm_uninstall_streams_app( + "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -592,6 +627,11 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean streams_app: StreamsApp, mocker: MockerFixture, ): + # actual component + mock_helm_uninstall_streams_app = mocker.patch.object( + streams_app.helm, "uninstall" + ) + mock_helm_upgrade_install = mocker.patch.object( streams_app._cleaner.helm, "upgrade_install" ) @@ -600,6 +640,7 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean ) mock = mocker.MagicMock() + mock.attach_mock(mock_helm_uninstall_streams_app, "helm_uninstall_streams_app") mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -608,6 +649,11 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean mock.assert_has_calls( [ + mocker.call.helm_uninstall_streams_app( + "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -642,7 +688,9 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean @pytest.mark.asyncio() async def test_get_input_output_topics( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name="my-app", @@ -720,6 +768,10 @@ def test_raise_validation_error_when_persistence_enabled_and_size_not_set( async def test_stateful_clean_with_dry_run_false( self, stateful_streams_app: StreamsApp, mocker: MockerFixture ): + # actual component + mock_helm_uninstall_streams_app = mocker.patch.object( + stateful_streams_app.helm, "uninstall" + ) cleaner = stateful_streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) @@ -736,6 +788,7 @@ async def test_stateful_clean_with_dry_run_false( ) mock = MagicMock() + mock.attach_mock(mock_helm_uninstall_streams_app, "helm_uninstall_streams_app") mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") mock.attach_mock(mock_delete_pvcs, "delete_pvcs") @@ -745,6 +798,11 @@ async def test_stateful_clean_with_dry_run_false( mock.assert_has_calls( [ + mocker.call.helm_uninstall_streams_app( + "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -790,6 +848,9 @@ async def test_stateful_clean_with_dry_run_true( caplog: pytest.LogCaptureFixture, ): caplog.set_level(logging.INFO) + # actual component + mocker.patch.object(stateful_streams_app, "destroy") + cleaner = stateful_streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) diff --git a/tests/components/test_streams_bootstrap.py b/tests/components/test_streams_bootstrap.py deleted file mode 100644 index 6084c3ea3..000000000 --- a/tests/components/test_streams_bootstrap.py +++ /dev/null @@ -1,129 +0,0 @@ -import re -from unittest.mock import MagicMock - -import pytest -from pydantic import ValidationError -from pytest_mock import MockerFixture - -from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.helm_wrapper.model import ( - HelmDiffConfig, - HelmRepoConfig, - HelmUpgradeInstallFlags, -) -from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components.common.streams_bootstrap import ( - StreamsBootstrap, - StreamsBootstrapValues, -) -from kpops.config import KpopsConfig -from tests.components import PIPELINE_BASE_DIR - - -@pytest.mark.usefixtures("mock_env") -class TestStreamsBootstrap: - @pytest.fixture() - def config(self) -> KpopsConfig: - return KpopsConfig( - helm_diff_config=HelmDiffConfig(), - pipeline_base_dir=PIPELINE_BASE_DIR, - ) - - @pytest.fixture() - def handlers(self) -> ComponentHandlers: - return ComponentHandlers( - schema_handler=MagicMock(), - connector_handler=MagicMock(), - topic_handler=MagicMock(), - ) - - def test_default_configs(self, config: KpopsConfig, handlers: ComponentHandlers): - streams_bootstrap = StreamsBootstrap( - name="example-name", - config=config, - handlers=handlers, - **{ - "namespace": "test-namespace", - "app": {}, - }, - ) - assert streams_bootstrap.repo_config == HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", - ) - assert streams_bootstrap.version == "2.9.0" - assert streams_bootstrap.namespace == "test-namespace" - assert streams_bootstrap.app.image_tag == "latest" - - @pytest.mark.asyncio() - async def test_should_deploy_streams_bootstrap_app( - self, - config: KpopsConfig, - handlers: ComponentHandlers, - mocker: MockerFixture, - ): - streams_bootstrap = StreamsBootstrap( - name="example-name", - config=config, - handlers=handlers, - **{ - "namespace": "test-namespace", - "app": { - "imageTag": "1.0.0", - "streams": { - "outputTopic": "test", - "brokers": "fake-broker:9092", - }, - }, - "version": "1.2.3", - }, - ) - helm_upgrade_install = mocker.patch.object( - streams_bootstrap.helm, "upgrade_install" - ) - print_helm_diff = mocker.patch.object( - streams_bootstrap.dry_run_handler, "print_helm_diff" - ) - mocker.patch.object( - StreamsBootstrap, - "helm_chart", - return_value="test/test-chart", - new_callable=mocker.PropertyMock, - ) - - await streams_bootstrap.deploy(dry_run=True) - - print_helm_diff.assert_called_once() - helm_upgrade_install.assert_called_once_with( - create_helm_release_name("${pipeline.name}-example-name"), - "test/test-chart", - True, - "test-namespace", - { - "nameOverride": "${pipeline.name}-example-name", - "imageTag": "1.0.0", - "streams": { - "brokers": "fake-broker:9092", - "outputTopic": "test", - }, - }, - HelmUpgradeInstallFlags(version="1.2.3"), - ) - - @pytest.mark.asyncio() - async def test_should_raise_validation_error_for_invalid_image_tag( - self, - config: KpopsConfig, - handlers: ComponentHandlers, - ): - with pytest.raises( - ValidationError, - match=re.escape( - "1 validation error for StreamsBootstrapValues\nimageTag\n String should match pattern '^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$'" - ), - ): - StreamsBootstrapValues( - **{ - "imageTag": "invalid image tag!", - } - ) diff --git a/tests/pipeline/test_clean.py b/tests/pipeline/test_clean.py index 1fbc7cab8..ffce3d61d 100644 --- a/tests/pipeline/test_clean.py +++ b/tests/pipeline/test_clean.py @@ -6,6 +6,15 @@ from typer.testing import CliRunner from kpops.cli.main import app +from kpops.components.base_components import HelmApp +from kpops.components.streams_bootstrap.producer.producer_app import ( + ProducerApp, + ProducerAppCleaner, +) +from kpops.components.streams_bootstrap.streams.streams_app import ( + StreamsApp, + StreamsAppCleaner, +) runner = CliRunner() @@ -52,19 +61,22 @@ def helm_mock(self, mocker: MockerFixture) -> MagicMock: # return pipeline def test_order(self, mocker: MockerFixture): - producer_app_mock_clean = mocker.patch( - "kpops.components.streams_bootstrap.producer.producer_app.ProducerApp.clean", - ) - streams_app_mock_clean = mocker.patch( - "kpops.components.streams_bootstrap.streams.streams_app.StreamsApp.clean", - ) - helm_app_mock_clean = mocker.patch( - "kpops.components.base_components.helm_app.HelmApp.clean", - ) - mock_clean = mocker.AsyncMock() - mock_clean.attach_mock(producer_app_mock_clean, "producer_app_mock_clean") - mock_clean.attach_mock(streams_app_mock_clean, "streams_app_mock_clean") - mock_clean.attach_mock(helm_app_mock_clean, "helm_app_mock_clean") + # destroy + producer_app_mock_destroy = mocker.patch.object(ProducerApp, "destroy") + streams_app_mock_destroy = mocker.patch.object(StreamsApp, "destroy") + helm_app_mock_destroy = mocker.patch.object(HelmApp, "destroy") + + # clean + streams_app_mock_clean = mocker.patch.object(StreamsAppCleaner, "clean") + producer_app_mock_clean = mocker.patch.object(ProducerAppCleaner, "clean") + + async_mocker = mocker.AsyncMock() + async_mocker.attach_mock(producer_app_mock_destroy, "producer_app_mock_destroy") + async_mocker.attach_mock(streams_app_mock_destroy, "streams_app_mock_destroy") + async_mocker.attach_mock(helm_app_mock_destroy, "helm_app_mock_destroy") + + async_mocker.attach_mock(producer_app_mock_clean, "producer_app_mock_clean") + async_mocker.attach_mock(streams_app_mock_clean, "streams_app_mock_clean") result = runner.invoke( app, @@ -78,13 +90,21 @@ def test_order(self, mocker: MockerFixture): assert result.exit_code == 0, result.stdout # check called + producer_app_mock_destroy.assert_called_once_with(True) + streams_app_mock_destroy.assert_called_once_with(True) + helm_app_mock_destroy.assert_called_once_with(True) + producer_app_mock_clean.assert_called_once_with(True) streams_app_mock_clean.assert_called_once_with(True) - helm_app_mock_clean.assert_called_once_with(True) # check reverse order - assert mock_clean.mock_calls == [ - mocker.call.helm_app_mock_clean(True), + assert async_mocker.mock_calls == [ + # HelmApp + mocker.call.helm_app_mock_destroy(True), + # StreamsApp + mocker.call.streams_app_mock_destroy(True), mocker.call.streams_app_mock_clean(True), + # ProducerApp + mocker.call.producer_app_mock_destroy(True), mocker.call.producer_app_mock_clean(True), ] diff --git a/tests/pipeline/test_reset.py b/tests/pipeline/test_reset.py index 07c7e1868..5eb6b7277 100644 --- a/tests/pipeline/test_reset.py +++ b/tests/pipeline/test_reset.py @@ -6,6 +6,10 @@ from typer.testing import CliRunner from kpops.cli.main import app +from kpops.components.base_components import HelmApp +from kpops.components.streams_bootstrap import ProducerApp, StreamsApp +from kpops.components.streams_bootstrap.producer.producer_app import ProducerAppCleaner +from kpops.components.streams_bootstrap.streams.streams_app import StreamsAppCleaner runner = CliRunner() @@ -22,19 +26,22 @@ def mock_helm(self, mocker: MockerFixture) -> MagicMock: ).return_value def test_order(self, mocker: MockerFixture): - producer_app_mock_reset = mocker.patch( - "kpops.components.streams_bootstrap.producer.producer_app.ProducerApp.reset", - ) - streams_app_mock_reset = mocker.patch( - "kpops.components.streams_bootstrap.streams.streams_app.StreamsApp.reset", - ) - helm_app_mock_reset = mocker.patch( - "kpops.components.base_components.helm_app.HelmApp.reset", - ) - mock_reset = mocker.AsyncMock() - mock_reset.attach_mock(producer_app_mock_reset, "producer_app_mock_reset") - mock_reset.attach_mock(streams_app_mock_reset, "streams_app_mock_reset") - mock_reset.attach_mock(helm_app_mock_reset, "helm_app_mock_reset") + # destroy + producer_app_mock_destroy = mocker.patch.object(ProducerApp, "destroy") + streams_app_mock_destroy = mocker.patch.object(StreamsApp, "destroy") + helm_app_mock_destroy = mocker.patch.object(HelmApp, "destroy") + + # reset + streams_app_mock_reset = mocker.patch.object(StreamsAppCleaner, "reset") + producer_app_mock_reset = mocker.patch.object(ProducerAppCleaner, "reset") + + async_mocker = mocker.AsyncMock() + async_mocker.attach_mock(producer_app_mock_destroy, "producer_app_mock_destroy") + async_mocker.attach_mock(streams_app_mock_destroy, "streams_app_mock_destroy") + async_mocker.attach_mock(helm_app_mock_destroy, "helm_app_mock_destroy") + + async_mocker.attach_mock(producer_app_mock_reset, "producer_app_mock_reset") + async_mocker.attach_mock(streams_app_mock_reset, "streams_app_mock_reset") result = runner.invoke( app, @@ -48,13 +55,20 @@ def test_order(self, mocker: MockerFixture): assert result.exit_code == 0, result.stdout # check called - producer_app_mock_reset.assert_called_once_with(True) + producer_app_mock_destroy.assert_called_once_with(True) + streams_app_mock_destroy.assert_called_once_with(True) + helm_app_mock_destroy.assert_called_once_with(True) + + producer_app_mock_reset.assert_not_called() streams_app_mock_reset.assert_called_once_with(True) - helm_app_mock_reset.assert_called_once_with(True) # check reverse order - assert mock_reset.mock_calls == [ - mocker.call.helm_app_mock_reset(True), + assert async_mocker.mock_calls == [ + # HelmApp + mocker.call.helm_app_mock_destroy(True), + # StreamsApp + mocker.call.streams_app_mock_destroy(True), mocker.call.streams_app_mock_reset(True), - mocker.call.producer_app_mock_reset(True), + # ProducerApp + mocker.call.producer_app_mock_destroy(True), ]