diff --git a/docs/docs/resources/pipeline-config/config.yaml b/docs/docs/resources/pipeline-config/config.yaml index ead12b785..862a49ac0 100644 --- a/docs/docs/resources/pipeline-config/config.yaml +++ b/docs/docs/resources/pipeline-config/config.yaml @@ -7,8 +7,6 @@ pipeline_base_dir: . # The Kafka brokers address. # REQUIRED kafka_brokers: "http://broker1:9092,http://broker2:9092" -# The name of the defaults file and the prefix of the defaults environment file. -defaults_filename_prefix: defaults # Configure the topic name variables you can use in the pipeline definition. topic_name_config: # Configures the value for the variable ${output_topic_name} diff --git a/docs/docs/resources/variables/cli_env_vars.env b/docs/docs/resources/variables/cli_env_vars.env index c51e2ee85..21436ded7 100644 --- a/docs/docs/resources/variables/cli_env_vars.env +++ b/docs/docs/resources/variables/cli_env_vars.env @@ -14,7 +14,8 @@ KPOPS_DOTENV_PATH # No default value, not required # Suffix your environment files with this value (e.g. # defaults_development.yaml for environment=development). KPOPS_ENVIRONMENT # No default value, not required -# Path to YAML with pipeline definition -KPOPS_PIPELINE_PATH # No default value, required +# Paths to dir containing 'pipeline.yaml' or files named +# 'pipeline.yaml'. +KPOPS_PIPELINE_PATHS # No default value, required # Comma separated list of steps to apply the command on KPOPS_PIPELINE_STEPS # No default value, not required diff --git a/docs/docs/resources/variables/cli_env_vars.md b/docs/docs/resources/variables/cli_env_vars.md index 87f8d2b2c..da6a2d994 100644 --- a/docs/docs/resources/variables/cli_env_vars.md +++ b/docs/docs/resources/variables/cli_env_vars.md @@ -5,5 +5,5 @@ These variables take precedence over the commands' flags. If a variable is set, |KPOPS_CONFIG_PATH |. |False |Path to the dir containing config.yaml files | |KPOPS_DOTENV_PATH | |False |Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. | |KPOPS_ENVIRONMENT | |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).| -|KPOPS_PIPELINE_PATH | |True |Path to YAML with pipeline definition | +|KPOPS_PIPELINE_PATHS| |True |Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. | |KPOPS_PIPELINE_STEPS| |False |Comma separated list of steps to apply the command on | diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index 12195fe9d..c4b4050e8 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -14,10 +14,6 @@ KPOPS_PIPELINE_BASE_DIR=. # kafka_brokers # The comma separated Kafka brokers address. KPOPS_KAFKA_BROKERS # No default value, required -# defaults_filename_prefix -# The name of the defaults file and the prefix of the defaults -# environment file. -KPOPS_DEFAULTS_FILENAME_PREFIX=defaults # topic_name_config.default_output_topic_name # Configures the value for the variable ${output_topic_name} KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME=${pipeline.name}-${component.name} diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 62871c466..171715ba5 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -5,7 +5,6 @@ These variables take precedence over the settings in `config.yaml`. Variables ma |KPOPS_COMPONENTS_MODULE | |False |Custom Python module defining project-specific KPOps components |components_module | |KPOPS_PIPELINE_BASE_DIR |. |False |Base directory to the pipelines (default is current working directory) |pipeline_base_dir | |KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | -|KPOPS_DEFAULTS_FILENAME_PREFIX |defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix | |KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline.name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| |KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline.name}-${component.name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name | |KPOPS_SCHEMA_REGISTRY__ENABLED |False |False |Whether the Schema Registry handler should be initialized. |schema_registry.enabled | diff --git a/docs/docs/resources/variables/variable_substitution.yaml b/docs/docs/resources/variables/variable_substitution.yaml index 4bdf51685..8a4cf60ea 100644 --- a/docs/docs/resources/variables/variable_substitution.yaml +++ b/docs/docs/resources/variables/variable_substitution.yaml @@ -4,8 +4,6 @@ app_type: "${component.type}" app_name: "${component.name}" app_schedule: "${component.app.schedule}" - helm_release_name: ${component.helm_release_name} - helm_name_override: ${component.helm_name_override} commandLine: FAKE_ARG: "fake-arg-value" schedule: "30 3/8 * * *" diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index f2a070842..47aab93b1 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -196,12 +196,6 @@ "title": "Create Namespace", "type": "boolean" }, - "defaults_filename_prefix": { - "default": "defaults", - "description": "The name of the defaults file and the prefix of the defaults environment file.", - "title": "Defaults Filename Prefix", - "type": "string" - }, "helm_config": { "allOf": [ { diff --git a/docs/docs/user/core-concepts/defaults.md b/docs/docs/user/core-concepts/defaults.md index 077ea4225..a3aad1957 100644 --- a/docs/docs/user/core-concepts/defaults.md +++ b/docs/docs/user/core-concepts/defaults.md @@ -44,13 +44,6 @@ It is important to note that `defaults_{environment}.yaml` overrides only the se - - -!!! tip - `defaults` is the default value of `defaults_filename_prefix`. - - - ## Components diff --git a/docs/docs/user/migration-guide/v5-v6.md b/docs/docs/user/migration-guide/v5-v6.md new file mode 100644 index 000000000..c6998cf35 --- /dev/null +++ b/docs/docs/user/migration-guide/v5-v6.md @@ -0,0 +1,16 @@ +# Migrate from V5 to V6 + +## [Deploy multiple pipelines](https://github.com/bakdata/kpops/pull/487) + +KPOps can now deploy multiple pipelines in a single command. It is possible to pass one or many pipeline.yaml files or pass a directory with many pipeline.yaml files within it. + +The environment variable `KPOPS_PIPELINE_PATH` is changed to `KPOPS_PIPELINE_PATHS`. + +Read more: + +- [CLI Usage](https://bakdata.github.io/kpops/6.0/user/references/cli-commands/) +- [Environment variables](https://bakdata.github.io/kpops/6.0/user/core-concepts/variables/environment_variables/) + +## [Separate KPOps API from the CLI](https://github.com/bakdata/kpops/pull/489) + +KPops Python API is now stable and separated from the CLI! 🎉 diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 3c46cb11b..570563069 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -31,12 +31,12 @@ Clean pipeline steps **Usage**: ```console -$ kpops clean [OPTIONS] PIPELINE_PATH +$ kpops clean [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -57,12 +57,12 @@ Deploy pipeline steps **Usage**: ```console -$ kpops deploy [OPTIONS] PIPELINE_PATH +$ kpops deploy [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -83,12 +83,12 @@ Destroy pipeline steps **Usage**: ```console -$ kpops destroy [OPTIONS] PIPELINE_PATH +$ kpops destroy [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -109,18 +109,17 @@ Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps **Usage**: ```console -$ kpops generate [OPTIONS] PIPELINE_PATH +$ kpops generate [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: * `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] * `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] @@ -153,18 +152,17 @@ In addition to generate, render final resource representation for each pipeline **Usage**: ```console -$ kpops manifest [OPTIONS] PIPELINE_PATH +$ kpops manifest [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: * `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] * `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] @@ -178,12 +176,12 @@ Reset pipeline steps **Usage**: ```console -$ kpops reset [OPTIONS] PIPELINE_PATH +$ kpops reset [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 4940ede82..f4e3a8ba6 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -113,6 +113,7 @@ nav: - Migrate from v2 to v3: user/migration-guide/v2-v3.md - Migrate from v3 to v4: user/migration-guide/v3-v4.md - Migrate from v4 to v5: user/migration-guide/v4-v5.md + - Migrate from v5 to v6: user/migration-guide/v5-v6.md - CLI usage: user/references/cli-commands.md - Editor integration: user/references/editor-integration.md - CI integration: diff --git a/hooks/gen_docs/gen_docs_components.py b/hooks/gen_docs/gen_docs_components.py index 2e0a4f1a0..10bb40af7 100644 --- a/hooks/gen_docs/gen_docs_components.py +++ b/hooks/gen_docs/gen_docs_components.py @@ -8,7 +8,7 @@ import yaml from hooks import ROOT -from kpops.cli.registry import _find_classes +from kpops.api.registry import _find_classes from kpops.components import KafkaConnector, PipelineComponent from kpops.utils.colorify import redify, yellowify from kpops.utils.pydantic import issubclass_patched diff --git a/hooks/gen_schema.py b/hooks/gen_schema.py index 8b9f40db2..62e154b29 100644 --- a/hooks/gen_schema.py +++ b/hooks/gen_schema.py @@ -5,8 +5,8 @@ from pathlib import Path from hooks import ROOT +from kpops.api.file_type import KpopsFileType from kpops.utils.gen_schema import ( - SchemaScope, gen_config_schema, gen_defaults_schema, gen_pipeline_schema, @@ -15,7 +15,7 @@ PATH_TO_SCHEMA = ROOT / "docs/docs/schema" -def gen_schema(scope: SchemaScope): +def gen_schema(scope: KpopsFileType): """Generate the specified schema and save it to a file. The file is located in docs/docs/schema and is named ``.json`` @@ -24,16 +24,16 @@ def gen_schema(scope: SchemaScope): """ with redirect_stdout(StringIO()) as f: match scope: - case SchemaScope.PIPELINE: + case KpopsFileType.PIPELINE: gen_pipeline_schema() - case SchemaScope.DEFAULTS: + case KpopsFileType.DEFAULTS: gen_defaults_schema() - case SchemaScope.CONFIG: + case KpopsFileType.CONFIG: gen_config_schema() Path(PATH_TO_SCHEMA / f"{scope.value}.json").write_text(f.getvalue()) if __name__ == "__main__": - gen_schema(SchemaScope.PIPELINE) - gen_schema(SchemaScope.DEFAULTS) - gen_schema(SchemaScope.CONFIG) + gen_schema(KpopsFileType.PIPELINE) + gen_schema(KpopsFileType.DEFAULTS) + gen_schema(KpopsFileType.CONFIG) diff --git a/kpops/__init__.py b/kpops/__init__.py index 5326c93bf..cb9b90273 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,7 +1,7 @@ __version__ = "5.1.1" # export public API functions -from kpops.cli.main import clean, deploy, destroy, generate, init, manifest, reset +from kpops.api import clean, deploy, destroy, generate, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py new file mode 100644 index 000000000..826307308 --- /dev/null +++ b/kpops/api/__init__.py @@ -0,0 +1,268 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import TYPE_CHECKING + +import kpops +from kpops.api.logs import log, log_action +from kpops.api.options import FilterType +from kpops.api.registry import Registry +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) +from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler +from kpops.component_handlers.topic.handler import TopicHandler +from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper +from kpops.config import KpopsConfig +from kpops.pipeline import ( + Pipeline, + PipelineGenerator, +) +from kpops.utils.cli_commands import init_project + +if TYPE_CHECKING: + from kpops.components import PipelineComponent + from kpops.components.base_components.models.resource import Resource + from kpops.config import KpopsConfig + + +def generate( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> Pipeline: + kpops_config = KpopsConfig.create( + config, + dotenv, + environment, + verbose, + ) + pipeline = create_pipeline(pipeline_path, kpops_config, environment) + log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") + if steps: + component_names = steps + log.debug( + f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" + ) + + predicate = filter_type.create_default_step_names_filter_predicate( + component_names + ) + pipeline.filter(predicate) + log.info(f"Filtered pipeline:\n{pipeline.step_names}") + return pipeline + + +def manifest( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> list[Resource]: + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + resources: list[Resource] = [] + for component in pipeline.components: + resource = component.manifest() + resources.append(resource) + return resources + + +def deploy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def deploy_runner(component: PipelineComponent): + log_action("Deploy", component) + await component.deploy(dry_run) + + async def async_deploy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(deploy_runner) + await pipeline_tasks + else: + for component in pipeline.components: + await deploy_runner(component) + + asyncio.run(async_deploy()) + + +def destroy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def destroy_runner(component: PipelineComponent): + log_action("Destroy", component) + await component.destroy(dry_run) + + async def async_destroy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph( + destroy_runner, reverse=True + ) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await destroy_runner(component) + + asyncio.run(async_destroy()) + + +def reset( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def reset_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Reset", component) + await component.reset(dry_run) + + async def async_reset(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await reset_runner(component) + + asyncio.run(async_reset()) + + +def clean( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def clean_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Clean", component) + await component.clean(dry_run) + + async def async_clean(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await clean_runner(component) + + asyncio.run(async_clean()) + + +def init( + path: Path, + config_include_opt: bool = False, +): + if not path.exists(): + path.mkdir(parents=False) + elif next(path.iterdir(), False): + log.warning("Please provide a path to an empty directory.") + return + init_project(path, config_include_opt) + + +def create_pipeline( + pipeline_path: Path, + kpops_config: KpopsConfig, + environment: str | None, +) -> Pipeline: + registry = Registry() + if kpops_config.components_module: + registry.find_components(kpops_config.components_module) + registry.find_components("kpops.components") + + handlers = setup_handlers(kpops_config) + parser = PipelineGenerator(kpops_config, registry, handlers) + return parser.load_yaml(pipeline_path, environment) + + +def setup_handlers(config: KpopsConfig) -> ComponentHandlers: + schema_handler = SchemaHandler.load_schema_handler(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) + topic_handler = TopicHandler(proxy_wrapper) + + return ComponentHandlers(schema_handler, connector_handler, topic_handler) diff --git a/kpops/api/exception.py b/kpops/api/exception.py new file mode 100644 index 000000000..65094fd29 --- /dev/null +++ b/kpops/api/exception.py @@ -0,0 +1,13 @@ +from __future__ import annotations + + +class ValidationError(Exception): + pass + + +class ParsingException(Exception): + pass + + +class ClassNotFoundError(Exception): + """Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module.""" diff --git a/kpops/api/file_type.py b/kpops/api/file_type.py new file mode 100644 index 000000000..c08fce987 --- /dev/null +++ b/kpops/api/file_type.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from enum import Enum + +FILE_EXTENSION = ".yaml" + + +class KpopsFileType(str, Enum): + """Enum representing different types of KPOps file naming conventions. + + Attributes: + PIPELINE (str): Represents a pipeline YAML file type. + DEFAULTS (str): Represents a defaults YAML file type. + CONFIG (str): Represents a config YAML file type. + """ + + PIPELINE = "pipeline" + DEFAULTS = "defaults" + CONFIG = "config" + + def as_yaml_file(self, prefix: str = "", suffix: str = "") -> str: + """Generate a YAML file name based on the enum value with optional prefix and suffix. + + Args: + prefix (str): An optional prefix for the file name. Default is an empty string. + suffix (str): An optional suffix for the file name. Default is an empty string. + + Returns: + str: The constructed file name in the format ''. + + Example: + >>> KpopsFileType.PIPELINE.as_yaml_file(prefix="pre_", suffix="_suf") + 'pre_pipeline_suf.yaml' + """ + return prefix + self.value + suffix + FILE_EXTENSION diff --git a/kpops/cli/custom_formatter.py b/kpops/api/logs.py similarity index 55% rename from kpops/cli/custom_formatter.py rename to kpops/api/logs.py index 69fc1c73d..e9a833aba 100644 --- a/kpops/cli/custom_formatter.py +++ b/kpops/api/logs.py @@ -1,7 +1,13 @@ +from __future__ import annotations + import logging +from typing import TYPE_CHECKING import typer +if TYPE_CHECKING: + from kpops.components import PipelineComponent + class CustomFormatter(logging.Formatter): def format(self, record): @@ -23,3 +29,21 @@ def format(self, record): log_fmt = formats.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) + + +logger = logging.getLogger() +logging.getLogger("httpx").setLevel(logging.WARNING) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(CustomFormatter()) +logger.addHandler(stream_handler) + +log = logging.getLogger("") +LOG_DIVIDER = "#" * 100 + + +def log_action(action: str, pipeline_component: PipelineComponent): + log.info("\n") + log.info(LOG_DIVIDER) + log.info(f"{action} {pipeline_component.name}") + log.info(LOG_DIVIDER) + log.info("\n") diff --git a/kpops/api/options.py b/kpops/api/options.py new file mode 100644 index 000000000..dc116bd35 --- /dev/null +++ b/kpops/api/options.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from kpops.components import PipelineComponent + from kpops.pipeline import ComponentFilterPredicate + + +class FilterType(str, Enum): + INCLUDE = "include" + EXCLUDE = "exclude" + + @staticmethod + def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: + return component.name in component_names + + def create_default_step_names_filter_predicate( + self, component_names: set[str] + ) -> ComponentFilterPredicate: + def predicate(component: PipelineComponent) -> bool: + match self, FilterType.is_in_steps(component, component_names): + case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): + return False + case _: + return True + + return predicate diff --git a/kpops/cli/registry.py b/kpops/api/registry.py similarity index 97% rename from kpops/cli/registry.py rename to kpops/api/registry.py index 2b3b51631..2df483329 100644 --- a/kpops/cli/registry.py +++ b/kpops/api/registry.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, TypeVar from kpops import __name__ -from kpops.cli.exception import ClassNotFoundError +from kpops.api.exception import ClassNotFoundError from kpops.components.base_components.pipeline_component import PipelineComponent if TYPE_CHECKING: diff --git a/kpops/cli/exception.py b/kpops/cli/exception.py deleted file mode 100644 index e9b0a65de..000000000 --- a/kpops/cli/exception.py +++ /dev/null @@ -1,2 +0,0 @@ -class ClassNotFoundError(Exception): - """Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module.""" diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 6bdcd8dc4..395ab8e53 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -1,44 +1,24 @@ from __future__ import annotations -import asyncio -import logging from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import Optional -import dtyper import typer +import kpops from kpops import __version__ -from kpops.cli.custom_formatter import CustomFormatter -from kpops.cli.options import FilterType -from kpops.cli.registry import Registry -from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper -from kpops.components.base_components.models.resource import Resource +from kpops.api.file_type import KpopsFileType +from kpops.api.options import FilterType +from kpops.cli.utils import collect_pipeline_paths from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.pipeline import ComponentFilterPredicate, Pipeline, PipelineGenerator -from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( - SchemaScope, gen_config_schema, gen_defaults_schema, gen_pipeline_schema, ) -from kpops.utils.pydantic import YamlConfigSettingsSource from kpops.utils.yaml import print_yaml -if TYPE_CHECKING: - from kpops.components import PipelineComponent - - -LOG_DIVIDER = "#" * 100 - -app = dtyper.Typer(pretty_exceptions_enable=False) +app = typer.Typer(pretty_exceptions_enable=False) DOTENV_PATH_OPTION: Optional[list[Path]] = typer.Option( default=None, @@ -62,14 +42,14 @@ help="Path to the dir containing config.yaml files", ) -PIPELINE_PATH_ARG: Path = typer.Argument( +PIPELINE_PATHS_ARG: list[Path] = typer.Argument( default=..., exists=True, file_okay=True, - dir_okay=False, + dir_okay=True, readable=True, - envvar=f"{ENV_PREFIX}PIPELINE_PATH", - help="Path to YAML with pipeline definition", + envvar=f"{ENV_PREFIX}PIPELINE_PATHS", + help="Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'.", ) PROJECT_PATH: Path = typer.Argument( @@ -126,102 +106,19 @@ ) -logger = logging.getLogger() -logging.getLogger("httpx").setLevel(logging.WARNING) -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(CustomFormatter()) -logger.addHandler(stream_handler) - -log = logging.getLogger("") - - -def setup_pipeline( - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, -) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - -def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) - - -def setup_logging_level(verbose: bool): - logging.getLogger().setLevel(logging.DEBUG if verbose else logging.INFO) - - -def parse_steps(steps: str) -> set[str]: - return set(steps.split(",")) - - -def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: - return component.name in component_names - - -def create_default_step_names_filter_predicate( - component_names: set[str], filter_type: FilterType -) -> ComponentFilterPredicate: - def predicate(component: PipelineComponent) -> bool: - match filter_type, is_in_steps(component, component_names): - case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): - return False - case _: - return True - - return predicate - +def parse_steps(steps: str | None) -> set[str] | None: + return set(steps.split(",")) if steps else None -def log_action(action: str, pipeline_component: PipelineComponent): - log.info("\n") - log.info(LOG_DIVIDER) - log.info(f"{action} {pipeline_component.name}") - log.info(LOG_DIVIDER) - log.info("\n") - -def create_kpops_config( - config: Path, - dotenv: list[Path] | None = None, - environment: str | None = None, - verbose: bool = False, -) -> KpopsConfig: - setup_logging_level(verbose) - YamlConfigSettingsSource.config_dir = config - YamlConfigSettingsSource.environment = environment - return KpopsConfig( - _env_file=dotenv # pyright: ignore[reportCallIssue] - ) - - -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 - help="Initialize a new KPOps project." -) +@app.command(help="Initialize a new KPOps project.") def init( path: Path = PROJECT_PATH, config_include_opt: bool = CONFIG_INCLUDE_OPTIONAL, ): - if not path.exists(): - path.mkdir(parents=False) - elif next(path.iterdir(), False): - log.warning("Please provide a path to an empty directory.") - return - init_project(path, config_include_opt) + kpops.init(path, config_include_opt=config_include_opt) -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command( help=""" Generate JSON schema. @@ -229,7 +126,7 @@ def init( """ ) def schema( - scope: SchemaScope = typer.Argument( + scope: KpopsFileType = typer.Argument( ..., show_default=False, help=""" @@ -245,100 +142,77 @@ def schema( ), ) -> None: match scope: - case SchemaScope.PIPELINE: - kpops_config = create_kpops_config(config) + case KpopsFileType.PIPELINE: + kpops_config = KpopsConfig.create(config) gen_pipeline_schema( kpops_config.components_module, include_stock_components ) - case SchemaScope.DEFAULTS: - kpops_config = create_kpops_config(config) + case KpopsFileType.DEFAULTS: + kpops_config = KpopsConfig.create(config) gen_defaults_schema( kpops_config.components_module, include_stock_components ) - case SchemaScope.CONFIG: + case KpopsFileType.CONFIG: gen_config_schema() -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command( short_help="Generate enriched pipeline representation", help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).", ) def generate( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, - output: bool = OUTPUT_OPTION, steps: Optional[str] = PIPELINE_STEPS, filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, -) -> Pipeline: - kpops_config = create_kpops_config( - config, - dotenv, - environment, - verbose, - ) - - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - - if steps: - component_names = parse_steps(steps) - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" - ) - - predicate = create_default_step_names_filter_predicate( - component_names, filter_type +): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + pipeline = kpops.generate( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + verbose=verbose, ) - pipeline.filter(predicate) - - def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: - return [step.name for step in steps_to_apply] - - log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") - if output: print_yaml(pipeline.to_yaml()) - return pipeline -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command( short_help="Render final resource representation", help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.", ) def manifest( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, - output: bool = OUTPUT_OPTION, steps: Optional[str] = PIPELINE_STEPS, filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, -) -> list[Resource]: - pipeline = generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - output=False, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - resources: list[Resource] = [] - for component in pipeline.components: - resource = component.manifest() - resources.append(resource) - if output: - for manifest in resource: - print_yaml(manifest) - return resources - - -@app.command(help="Deploy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + resources = kpops.manifest( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + for resource in resources: + for rendered_manifest in resource: + print_yaml(rendered_manifest) + + +@app.command(help="Deploy pipeline steps") def deploy( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -348,35 +222,23 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - output=False, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def deploy_runner(component: PipelineComponent): - log_action("Deploy", component) - await component.deploy(dry_run) - - async def async_deploy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(deploy_runner) - await pipeline_tasks - else: - for component in pipeline.components: - await deploy_runner(component) - - asyncio.run(async_deploy()) - - -@app.command(help="Destroy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.deploy( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) + + +@app.command(help="Destroy pipeline steps") def destroy( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -386,37 +248,23 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - output=False, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def destroy_runner(component: PipelineComponent): - log_action("Destroy", component) - await component.destroy(dry_run) - - async def async_destroy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph( - destroy_runner, reverse=True - ) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await destroy_runner(component) - - asyncio.run(async_destroy()) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.destroy( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) -@app.command(help="Reset pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command(help="Reset pipeline steps") def reset( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -426,36 +274,23 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - output=False, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Reset", component) - await component.reset(dry_run) - - async def async_reset(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await reset_runner(component) - - asyncio.run(async_reset()) - - -@app.command(help="Clean pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.reset( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) + + +@app.command(help="Clean pipeline steps") def clean( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -465,31 +300,18 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - output=False, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Clean", component) - await component.clean(dry_run) - - async def async_clean(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await clean_runner(component) - - asyncio.run(async_clean()) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.clean( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) def version_callback(show_version: bool) -> None: diff --git a/kpops/cli/options.py b/kpops/cli/options.py deleted file mode 100644 index ac176d986..000000000 --- a/kpops/cli/options.py +++ /dev/null @@ -1,6 +0,0 @@ -from enum import Enum - - -class FilterType(str, Enum): - INCLUDE = "include" - EXCLUDE = "exclude" diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py new file mode 100644 index 000000000..f4a04bcbb --- /dev/null +++ b/kpops/cli/utils.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from collections.abc import Iterable, Iterator +from pathlib import Path + +from kpops.api.file_type import KpopsFileType + +PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() + + +def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]: + """Generate paths to pipeline files. + + :param pipeline_paths: The list of paths to the pipeline files or directories. + + :yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path. + For a directory it yields all the pipeline.yaml paths. + + :raises: ValueError: If `pipeline_path` is neither a file nor a directory. + """ + for pipeline_path in pipeline_paths: + if pipeline_path.is_file(): + yield pipeline_path + elif pipeline_path.is_dir(): + yield from sorted(pipeline_path.glob(f"**/{PIPELINE_YAML}")) + else: + msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file." + raise ValueError(msg) diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 936ba0223..94b5cd3bf 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -8,8 +8,8 @@ from schema_registry.client import AsyncSchemaRegistryClient from schema_registry.client.schema import AvroSchema -from kpops.cli.exception import ClassNotFoundError -from kpops.cli.registry import find_class +from kpops.api.exception import ClassNotFoundError +from kpops.api.registry import find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index 45711359b..f63eaf5c2 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -19,6 +19,7 @@ ) from pydantic.json_schema import SkipJsonSchema +from kpops.api.file_type import KpopsFileType from kpops.component_handlers import ComponentHandlers from kpops.config import KpopsConfig from kpops.utils import cached_classproperty @@ -256,8 +257,7 @@ def get_defaults_file_paths( associated with the pipeline. :param pipeline_path: The path to the pipeline.yaml file. - :param config: The KPOps configuration object containing settings such as pipeline_base_dir - and defaults_filename_prefix. + :param config: The KPOps configuration object containing settings such as pipeline_base_dir. :param environment: Optional. The environment for which default configuration files are sought. :returns: A list of Path objects representing the default configuration file paths. """ @@ -274,12 +274,12 @@ def get_defaults_file_paths( raise RuntimeError(message) while pipeline_base_dir != path: environment_default_file_path = ( - path.parent / f"{config.defaults_filename_prefix}_{environment}.yaml" + path.parent / KpopsFileType.DEFAULTS.as_yaml_file(suffix=f"_{environment}") ) if environment_default_file_path.is_file(): default_paths.append(environment_default_file_path) - defaults_yaml_path = path.parent / f"{config.defaults_filename_prefix}.yaml" + defaults_yaml_path = path.parent / KpopsFileType.DEFAULTS.as_yaml_file() if defaults_yaml_path.is_file(): default_paths.append(defaults_yaml_path) diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 39c96b79e..94fa3e8cd 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -5,7 +5,7 @@ from typing import Annotated, Any import pydantic -from pydantic import Field, SerializationInfo, computed_field, model_serializer +from pydantic import Field, SerializationInfo, model_serializer from typing_extensions import override from kpops.component_handlers.helm_wrapper.dry_run_handler import DryRunHandler @@ -103,13 +103,11 @@ def dry_run_handler(self) -> DryRunHandler: helm_diff = HelmDiff(self.config.helm_diff_config) return DryRunHandler(self.helm, helm_diff, self.namespace) - @computed_field @property def helm_release_name(self) -> str: """The name for the Helm release.""" return create_helm_release_name(self.full_name) - @computed_field @property def helm_name_override(self) -> str: """Helm chart name override.""" diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index df2348773..04f95b54b 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -5,12 +5,12 @@ import pydantic from pydantic import BaseModel, ConfigDict, Field, model_validator +from kpops.api.exception import ValidationError from kpops.components.base_components.kafka_app import ( KafkaAppValues, KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.pipeline import ValidationError from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/config.py b/kpops/config.py index f82068c96..7efbbf0b7 100644 --- a/kpops/config.py +++ b/kpops/config.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from pathlib import Path from pydantic import AnyHttpUrl, Field, TypeAdapter @@ -88,10 +89,6 @@ class KpopsConfig(BaseSettings): ], description="The comma separated Kafka brokers address.", ) - defaults_filename_prefix: str = Field( - default="defaults", - description="The name of the defaults file and the prefix of the defaults environment file.", - ) topic_name_config: TopicNameConfig = Field( default=TopicNameConfig(), description=describe_object(TopicNameConfig.__doc__), @@ -127,6 +124,25 @@ class KpopsConfig(BaseSettings): model_config = SettingsConfigDict(env_prefix=ENV_PREFIX, env_nested_delimiter="__") + @classmethod + def create( + cls, + config: Path, + dotenv: list[Path] | None = None, + environment: str | None = None, + verbose: bool = False, + ) -> KpopsConfig: + cls.setup_logging_level(verbose) + YamlConfigSettingsSource.config_dir = config + YamlConfigSettingsSource.environment = environment + return KpopsConfig( + _env_file=dotenv # pyright: ignore[reportCallIssue] + ) + + @staticmethod + def setup_logging_level(verbose: bool): + logging.getLogger().setLevel(logging.DEBUG if verbose else logging.INFO) + @override @classmethod def settings_customise_sources( diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 3f4f698b9..f0b1aa6b8 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -15,6 +15,9 @@ computed_field, ) +from kpops.api.exception import ParsingException, ValidationError +from kpops.api.registry import Registry +from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.dict_ops import update_nested_pair from kpops.utils.environment import ENV, PIPELINE_PATH @@ -24,21 +27,10 @@ from collections.abc import Awaitable, Coroutine, Iterator from pathlib import Path - from kpops.cli.registry import Registry - from kpops.component_handlers import ComponentHandlers from kpops.config import KpopsConfig log = logging.getLogger("PipelineGenerator") - -class ParsingException(Exception): - pass - - -class ValidationError(Exception): - pass - - ComponentFilterPredicate: TypeAlias = Callable[[PipelineComponent], bool] @@ -50,6 +42,10 @@ class Pipeline(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) + @property + def step_names(self) -> list[str]: + return [step.name for step in self.components] + @computed_field(title="Components") @property def components(self) -> list[SerializeAsAny[PipelineComponent]]: @@ -72,7 +68,7 @@ def remove(self, component_id: str) -> None: self._component_index.pop(component_id) def get(self, component_id: str) -> PipelineComponent | None: - self._component_index.get(component_id) + return self._component_index.get(component_id) def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]: """Find pipeline components matching a custom predicate. diff --git a/kpops/utils/cli_commands.py b/kpops/utils/cli_commands.py index c2c994341..614257b22 100644 --- a/kpops/utils/cli_commands.py +++ b/kpops/utils/cli_commands.py @@ -6,6 +6,7 @@ from pydantic.fields import FieldInfo from pydantic_core import PydanticUndefined +from kpops.api.file_type import KpopsFileType from kpops.config import KpopsConfig from kpops.utils.docstring import describe_object from kpops.utils.json import is_jsonable @@ -72,6 +73,6 @@ def init_project(path: Path, conf_incl_opt: bool): :param conf_incl_opt: Whether to include non-required settings in the generated config file """ - create_config("config", path, conf_incl_opt) - Path(path / "pipeline.yaml").touch(exist_ok=False) - Path(path / "defaults.yaml").touch(exist_ok=False) + create_config(KpopsFileType.CONFIG.value, path, conf_incl_opt) + Path(path / KpopsFileType.PIPELINE.as_yaml_file()).touch(exist_ok=False) + Path(path / KpopsFileType.DEFAULTS.as_yaml_file()).touch(exist_ok=False) diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index b07749ec6..2b7d61862 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -3,7 +3,6 @@ import logging from abc import ABC from collections.abc import Sequence -from enum import Enum from typing import Annotated, Any, Literal, Union from pydantic import ( @@ -21,19 +20,13 @@ ModelFieldsSchema, ) -from kpops.cli.registry import _find_classes +from kpops.api.registry import _find_classes from kpops.components import ( PipelineComponent, ) from kpops.config import KpopsConfig -class SchemaScope(str, Enum): - PIPELINE = "pipeline" - DEFAULTS = "defaults" - CONFIG = "config" - - class MultiComponentGenerateJsonSchema(GenerateJsonSchema): ... diff --git a/poetry.lock b/poetry.lock index 1089dfff0..e6f270466 100644 --- a/poetry.lock +++ b/poetry.lock @@ -358,20 +358,6 @@ files = [ {file = "distlib-0.3.6.tar.gz", hash = "sha256:14bad2d9b04d3a36127ac97f30b12a19268f211063d8f8ee4f47108896e11b46"}, ] -[[package]] -name = "dtyper" -version = "2.1.0" -description = "🗝 Make `typer` commands callable, or dataclasses 🗝" -optional = false -python-versions = ">=3.7" -files = [ - {file = "dtyper-2.1.0-py3-none-any.whl", hash = "sha256:331f513b33ccd43c1a803a2a06cdb879ed3925381aed9c04bd65470d58107ac6"}, - {file = "dtyper-2.1.0.tar.gz", hash = "sha256:c18a7198c4d9f9194f862307dc326f2594acaffb8e2a6f81335ebc8bf6ed9e40"}, -] - -[package.dependencies] -typer = "*" - [[package]] name = "exceptiongroup" version = "1.0.4" @@ -2303,4 +2289,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = ">=3.10, <3.13" -content-hash = "dc29f4a291f406a5d651b0a82645fd8457387a4a821d71a4da1a7a3bffa18d7e" +content-hash = "80ac595bb43560de5e123076830390054b2c9c15b865f3665e1e1b114a31faad" diff --git a/pyproject.toml b/pyproject.toml index a1136a462..3075c1567 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,6 @@ pydantic-settings = "^2.0.3" rich = "^12.4.4" PyYAML = "^6.0" typer = { extras = ["all"], version = "^0.6.1" } -dtyper = "^2.1.0" pyhumps = "^3.7.3" cachetools = "^5.2.0" dictdiffer = "^0.9.0" diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cli/test_handlers.py b/tests/api/test_handlers.py similarity index 83% rename from tests/cli/test_handlers.py rename to tests/api/test_handlers.py index 7ba0651a5..18d35131b 100644 --- a/tests/cli/test_handlers.py +++ b/tests/api/test_handlers.py @@ -1,6 +1,6 @@ from pytest_mock import MockerFixture -from kpops.cli.main import setup_handlers +from kpops.api import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -10,6 +10,8 @@ from kpops.config import KpopsConfig, SchemaRegistryConfig from tests.cli.resources.custom_module import CustomSchemaProvider +HANDLER_MODULE = "kpops.api" + MODULE = CustomSchemaProvider.__module__ @@ -18,12 +20,12 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): kafka_brokers="broker:9092", components_module=MODULE, ) - connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") - wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") + topic_handler_mock = mocker.patch(f"{HANDLER_MODULE}.TopicHandler") + wrapper = mocker.patch(f"{HANDLER_MODULE}.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -51,16 +53,16 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_registry=SchemaRegistryConfig(enabled=True), kafka_brokers="broker:9092", ) - schema_handler_mock = mocker.patch("kpops.cli.main.SchemaHandler") + schema_handler_mock = mocker.patch(f"{HANDLER_MODULE}.SchemaHandler") schema_handler = SchemaHandler.load_schema_handler(config) schema_handler_mock.load_schema_handler.return_value = schema_handler - connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") - wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") + topic_handler_mock = mocker.patch(f"{HANDLER_MODULE}.TopicHandler") + wrapper = mocker.patch(f"{HANDLER_MODULE}.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler diff --git a/tests/cli/test_registry.py b/tests/api/test_registry.py similarity index 94% rename from tests/cli/test_registry.py rename to tests/api/test_registry.py index f305c24a3..00daee08d 100644 --- a/tests/cli/test_registry.py +++ b/tests/api/test_registry.py @@ -2,7 +2,8 @@ import pytest -from kpops.cli.registry import ClassNotFoundError, Registry, _find_classes, find_class +from kpops.api.exception import ClassNotFoundError +from kpops.api.registry import Registry, _find_classes, find_class from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider from kpops.components.base_components.pipeline_component import PipelineComponent from tests.cli.resources.custom_module import CustomSchemaProvider diff --git a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml index ae708fef2..5c251a22c 100644 --- a/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml +++ b/tests/cli/snapshots/test_init/test_init_project/config_include_opt.yaml @@ -6,7 +6,6 @@ kafka_brokers: null # Non-required fields components_module: null create_namespace: false -defaults_filename_prefix: defaults helm_config: api_version: null context: null diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 4e1b4aa09..0741340e0 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -9,8 +9,8 @@ from pydantic import ConfigDict, Field from typer.testing import CliRunner +from kpops.api.registry import Registry from kpops.cli.main import app -from kpops.cli.registry import Registry from kpops.components import PipelineComponent from kpops.utils.docstring import describe_attr diff --git a/tests/compiler/test_pipeline_name.py b/tests/compiler/test_pipeline_name.py index 1d70b4d13..24777dd1f 100644 --- a/tests/compiler/test_pipeline_name.py +++ b/tests/compiler/test_pipeline_name.py @@ -2,10 +2,14 @@ import pytest +from kpops.api.file_type import KpopsFileType from kpops.pipeline import PipelineGenerator from kpops.utils.environment import ENV -PIPELINE_PATH = Path("./some/random/path/for/testing/pipeline.yaml") +PIPELINE_PATH = ( + Path("./some/random/path/for/testing") / KpopsFileType.PIPELINE.as_yaml_file() +) + PIPELINE_BASE_DIR = Path() diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 000fc6756..ab21ef68e 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -6,6 +6,7 @@ import pydantic import pytest +from kpops.api.file_type import KpopsFileType from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.base_defaults_component import ( BaseDefaultsComponent, @@ -16,6 +17,10 @@ from kpops.utils.environment import ENV from tests.components import PIPELINE_BASE_DIR, RESOURCES_PATH +PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() + +DEFAULTS_YAML = KpopsFileType.DEFAULTS.as_yaml_file() + class Parent(BaseDefaultsComponent): __test__ = False @@ -86,9 +91,7 @@ class TestBaseDefaultsComponent: def test_load_defaults( self, component_class: type[BaseDefaultsComponent], defaults: dict ): - assert ( - component_class.load_defaults(RESOURCES_PATH / "defaults.yaml") == defaults - ) + assert component_class.load_defaults(RESOURCES_PATH / DEFAULTS_YAML) == defaults @pytest.mark.parametrize( ("component_class", "defaults"), @@ -117,8 +120,9 @@ def test_load_defaults_with_environment( ): assert ( component_class.load_defaults( - RESOURCES_PATH / "defaults_development.yaml", - RESOURCES_PATH / "defaults.yaml", + RESOURCES_PATH + / KpopsFileType.DEFAULTS.as_yaml_file(suffix="_development"), + RESOURCES_PATH / DEFAULTS_YAML, ) == defaults ) @@ -212,16 +216,17 @@ def test_merge_defaults(self, config: KpopsConfig, handlers: ComponentHandlers): [ ( RESOURCES_PATH - / "pipelines/test-distributed-defaults/pipeline-deep/pipeline.yaml", + / "pipelines/test-distributed-defaults/pipeline-deep" + / PIPELINE_YAML, None, [ Path( - f"{RESOURCES_PATH}/pipelines/test-distributed-defaults/pipeline-deep/defaults.yaml" - ), - Path( - f"{RESOURCES_PATH}/pipelines/test-distributed-defaults/defaults.yaml" - ), - Path(f"{RESOURCES_PATH}/defaults.yaml"), + f"{RESOURCES_PATH}/pipelines/test-distributed-defaults/pipeline-deep" + ) + / DEFAULTS_YAML, + Path(f"{RESOURCES_PATH}/pipelines/test-distributed-defaults") + / DEFAULTS_YAML, + Path(RESOURCES_PATH) / DEFAULTS_YAML, ], ), ( diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 7af0634d7..cb340174a 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -5,6 +5,7 @@ import pytest from pytest_mock import MockerFixture +from kpops.api.exception import ValidationError from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmDiffConfig, @@ -29,7 +30,6 @@ StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig -from kpops.pipeline import ValidationError from tests.components import PIPELINE_BASE_DIR RESOURCES_PATH = Path(__file__).parent / "resources" diff --git a/tests/pipeline/resources/component-type-substitution/pipeline.yaml b/tests/pipeline/resources/component-type-substitution/pipeline.yaml index 4bdf51685..8a4cf60ea 100644 --- a/tests/pipeline/resources/component-type-substitution/pipeline.yaml +++ b/tests/pipeline/resources/component-type-substitution/pipeline.yaml @@ -4,8 +4,6 @@ app_type: "${component.type}" app_name: "${component.name}" app_schedule: "${component.app.schedule}" - helm_release_name: ${component.helm_release_name} - helm_name_override: ${component.helm_name_override} commandLine: FAKE_ARG: "fake-arg-value" schedule: "30 3/8 * * *" diff --git a/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml new file mode 100644 index 000000000..503665e9a --- /dev/null +++ b/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml @@ -0,0 +1,5 @@ +- type: scheduled-producer + app: + commandLine: + FAKE_ARG: "fake-arg-value" + schedule: "30 3/8 * * *" diff --git a/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml new file mode 100644 index 000000000..0dfc1da57 --- /dev/null +++ b/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml @@ -0,0 +1,9 @@ +- type: converter + app: + commandLine: + CONVERT_XML: true + resources: + limits: + memory: 2G + requests: + memory: 2G diff --git a/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml new file mode 100644 index 000000000..99571e5e6 --- /dev/null +++ b/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml @@ -0,0 +1,12 @@ +- type: filter + name: "a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name" + app: + commandLine: + TYPE: "nothing" + resources: + requests: + memory: 3G + replicaCount: 4 + autoscaling: + minReplicas: 4 + maxReplicas: 4 diff --git a/tests/pipeline/snapshots/test_example/test_generate/atm-fraud/pipeline.yaml b/tests/pipeline/snapshots/test_example/test_generate/atm-fraud/pipeline.yaml index b37b2d3dc..528da26bb 100644 --- a/tests/pipeline/snapshots/test_example/test_generate/atm-fraud/pipeline.yaml +++ b/tests/pipeline/snapshots/test_example/test_generate/atm-fraud/pipeline.yaml @@ -16,8 +16,6 @@ schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ suspend: true debug: true - helm_name_override: atm-fraud-account-producer-clean - helm_release_name: atm-fraud-account-producer-clean name: account-producer namespace: ${NAMESPACE} prefix: atm-fraud- @@ -46,8 +44,6 @@ schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ suspend: true debug: true - helm_name_override: atm-fraud-account-producer - helm_release_name: atm-fraud-account-producer name: account-producer namespace: ${NAMESPACE} prefix: atm-fraud- @@ -85,8 +81,6 @@ schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ suspend: true debug: true - helm_name_override: atm-fraud-transaction-avro-producer-clean - helm_release_name: atm-fraud-transaction-avro-producer-clean name: transaction-avro-producer namespace: ${NAMESPACE} prefix: atm-fraud- @@ -118,8 +112,6 @@ schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ suspend: true debug: true - helm_name_override: atm-fraud-transaction-avro-producer - helm_release_name: atm-fraud-transaction-avro-producer name: transaction-avro-producer namespace: ${NAMESPACE} prefix: atm-fraud- @@ -162,8 +154,6 @@ outputTopic: atm-fraud-transaction-joiner-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: atm-fraud-transaction-joiner-clean - helm_release_name: atm-fraud-transaction-joiner-clean name: transaction-joiner namespace: ${NAMESPACE} prefix: atm-fraud- @@ -200,8 +190,6 @@ outputTopic: atm-fraud-transaction-joiner-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: atm-fraud-transaction-joiner - helm_release_name: atm-fraud-transaction-joiner name: transaction-joiner namespace: ${NAMESPACE} prefix: atm-fraud- @@ -248,8 +236,6 @@ outputTopic: atm-fraud-fraud-detector-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: atm-fraud-fraud-detector-clean - helm_release_name: atm-fraud-fraud-detector-clean name: fraud-detector namespace: ${NAMESPACE} prefix: atm-fraud- @@ -286,8 +272,6 @@ outputTopic: atm-fraud-fraud-detector-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: atm-fraud-fraud-detector - helm_release_name: atm-fraud-fraud-detector name: fraud-detector namespace: ${NAMESPACE} prefix: atm-fraud- @@ -337,8 +321,6 @@ outputTopic: atm-fraud-account-linker-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: atm-fraud-account-linker-clean - helm_release_name: atm-fraud-account-linker-clean name: account-linker namespace: ${NAMESPACE} prefix: atm-fraud- @@ -385,8 +367,6 @@ fraud-detector: type: input topics: {} - helm_name_override: atm-fraud-account-linker - helm_release_name: atm-fraud-account-linker name: account-linker namespace: ${NAMESPACE} prefix: atm-fraud- @@ -413,8 +393,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: atm-fraud-postgresql-connector connectorType: sink - helm_name_override: atm-fraud-postgresql-connector-clean - helm_release_name: atm-fraud-postgresql-connector-clean name: postgresql-connector namespace: ${NAMESPACE} prefix: atm-fraud- diff --git a/tests/pipeline/snapshots/test_example/test_generate/word-count/pipeline.yaml b/tests/pipeline/snapshots/test_example/test_generate/word-count/pipeline.yaml index f757ae815..7e7d16fe8 100644 --- a/tests/pipeline/snapshots/test_example/test_generate/word-count/pipeline.yaml +++ b/tests/pipeline/snapshots/test_example/test_generate/word-count/pipeline.yaml @@ -14,8 +14,6 @@ outputTopic: word-count-data-producer-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: word-count-data-producer-clean - helm_release_name: word-count-data-producer-clean name: data-producer namespace: ${NAMESPACE} prefix: word-count- @@ -42,8 +40,6 @@ outputTopic: word-count-data-producer-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: word-count-data-producer - helm_release_name: word-count-data-producer name: data-producer namespace: ${NAMESPACE} prefix: word-count- @@ -84,8 +80,6 @@ outputTopic: word-count-word-counter-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: word-count-word-counter-clean - helm_release_name: word-count-word-counter-clean name: word-counter namespace: ${NAMESPACE} prefix: word-count- @@ -120,8 +114,6 @@ outputTopic: word-count-word-counter-topic schemaRegistryUrl: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/ debug: true - helm_name_override: word-count-word-counter - helm_release_name: word-count-word-counter name: word-counter namespace: ${NAMESPACE} prefix: word-count- @@ -150,8 +142,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: word-count-redis-sink-connector connectorType: sink - helm_name_override: word-count-redis-sink-connector-clean - helm_release_name: word-count-redis-sink-connector-clean name: redis-sink-connector namespace: ${NAMESPACE} prefix: word-count- diff --git a/tests/pipeline/snapshots/test_generate/test_default_config/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_default_config/pipeline.yaml index c101b25b4..cbd0e251e 100644 --- a/tests/pipeline/snapshots/test_generate/test_default_config/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_default_config/pipeline.yaml @@ -9,8 +9,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-custom-config-app1 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app1-clean - helm_release_name: resources-custom-config-app1-clean name: app1 namespace: development-namespace prefix: resources-custom-config- @@ -32,8 +30,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-custom-config-app1 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app1 - helm_release_name: resources-custom-config-app1 name: app1 namespace: development-namespace prefix: resources-custom-config- @@ -68,8 +64,6 @@ - resources-custom-config-app1 outputTopic: resources-custom-config-app2 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app2-clean - helm_release_name: resources-custom-config-app2-clean name: app2 namespace: development-namespace prefix: resources-custom-config- @@ -97,8 +91,6 @@ - resources-custom-config-app1 outputTopic: resources-custom-config-app2 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app2 - helm_release_name: resources-custom-config-app2 name: app2 namespace: development-namespace prefix: resources-custom-config- diff --git a/tests/pipeline/snapshots/test_generate/test_inflate_pipeline/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_inflate_pipeline/pipeline.yaml index 8b7b83d9b..e5e003376 100644 --- a/tests/pipeline/snapshots/test_generate/test_inflate_pipeline/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_inflate_pipeline/pipeline.yaml @@ -9,8 +9,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-pipeline-with-inflate-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-scheduled-producer-clean - helm_release_name: resources-pipeline-with-inflate-scheduled-18411-clean name: scheduled-producer namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -32,8 +30,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-pipeline-with-inflate-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-scheduled-producer - helm_release_name: resources-pipeline-with-inflate-scheduled-producer name: scheduled-producer namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -85,8 +81,6 @@ - resources-pipeline-with-inflate-scheduled-producer outputTopic: resources-pipeline-with-inflate-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-converter-clean - helm_release_name: resources-pipeline-with-inflate-converter-clean name: converter namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -128,8 +122,6 @@ - resources-pipeline-with-inflate-scheduled-producer outputTopic: resources-pipeline-with-inflate-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-converter - helm_release_name: resources-pipeline-with-inflate-converter name: converter namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -188,8 +180,6 @@ - resources-pipeline-with-inflate-converter outputTopic: resources-pipeline-with-inflate-should-inflate schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-should-inflate-clean - helm_release_name: resources-pipeline-with-inflate-should-inflate-clean name: should-inflate namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -233,8 +223,6 @@ - resources-pipeline-with-inflate-converter outputTopic: resources-pipeline-with-inflate-should-inflate schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-should-inflate - helm_release_name: resources-pipeline-with-inflate-should-inflate name: should-inflate namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -265,8 +253,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: resources-pipeline-with-inflate-should-inflate-inflated-sink-connector connectorType: sink - helm_name_override: resources-pipeline-with-inflate-should-inflate-infl-cfdd0-clean - helm_release_name: resources-pipeline-with-inflate-should-in-cfdd0-clean name: should-inflate-inflated-sink-connector namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -319,8 +305,6 @@ - kafka-sink-connector outputTopic: resources-pipeline-with-inflate-should-inflate-should-inflate-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-should-inflate-infl-c7b41-clean - helm_release_name: resources-pipeline-with-inflate-should-in-c7b41-clean name: should-inflate-inflated-streams-app namespace: example-namespace prefix: resources-pipeline-with-inflate- @@ -345,8 +329,6 @@ - kafka-sink-connector outputTopic: resources-pipeline-with-inflate-should-inflate-should-inflate-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-inflate-should-inflate-inflated-s-2ea20 - helm_release_name: resources-pipeline-with-inflate-should-inflate--2ea20 name: should-inflate-inflated-streams-app namespace: example-namespace prefix: resources-pipeline-with-inflate- diff --git a/tests/pipeline/snapshots/test_generate/test_kafka_connect_sink_weave_from_topics/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_kafka_connect_sink_weave_from_topics/pipeline.yaml index b7928f4fd..bb569e772 100644 --- a/tests/pipeline/snapshots/test_generate/test_kafka_connect_sink_weave_from_topics/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_kafka_connect_sink_weave_from_topics/pipeline.yaml @@ -13,8 +13,6 @@ - example-topic outputTopic: example-output schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-kafka-connect-sink-streams-app-clean - helm_release_name: resources-kafka-connect-sink-streams-app-clean name: streams-app namespace: example-namespace prefix: resources-kafka-connect-sink- @@ -45,8 +43,6 @@ topics: example-topic: type: input - helm_name_override: resources-kafka-connect-sink-streams-app - helm_release_name: resources-kafka-connect-sink-streams-app name: streams-app namespace: example-namespace prefix: resources-kafka-connect-sink- @@ -75,8 +71,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: resources-kafka-connect-sink-es-sink-connector connectorType: sink - helm_name_override: resources-kafka-connect-sink-es-sink-connector-clean - helm_release_name: resources-kafka-connect-sink-es-sink-connector-clean name: es-sink-connector namespace: example-namespace prefix: resources-kafka-connect-sink- diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline/pipeline.yaml index fa1199b23..87a88601c 100644 --- a/tests/pipeline/snapshots/test_generate/test_load_pipeline/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline/pipeline.yaml @@ -9,8 +9,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-first-pipeline-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-first-pipeline-scheduled-producer-clean - helm_release_name: resources-first-pipeline-scheduled-producer-clean name: scheduled-producer namespace: example-namespace prefix: resources-first-pipeline- @@ -32,8 +30,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-first-pipeline-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-first-pipeline-scheduled-producer - helm_release_name: resources-first-pipeline-scheduled-producer name: scheduled-producer namespace: example-namespace prefix: resources-first-pipeline- @@ -85,8 +81,6 @@ - resources-first-pipeline-scheduled-producer outputTopic: resources-first-pipeline-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-first-pipeline-converter-clean - helm_release_name: resources-first-pipeline-converter-clean name: converter namespace: example-namespace prefix: resources-first-pipeline- @@ -128,8 +122,6 @@ - resources-first-pipeline-scheduled-producer outputTopic: resources-first-pipeline-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-first-pipeline-converter - helm_release_name: resources-first-pipeline-converter name: converter namespace: example-namespace prefix: resources-first-pipeline- @@ -188,8 +180,6 @@ - resources-first-pipeline-converter outputTopic: resources-first-pipeline-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-first-pipeline-a-long-name-a-long-name-a--48eb9-clean - helm_release_name: resources-first-pipeline-a-long-name-a-lo-48eb9-clean name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name namespace: example-namespace prefix: resources-first-pipeline- @@ -233,8 +223,6 @@ - resources-first-pipeline-converter outputTopic: resources-first-pipeline-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-first-pipeline-a-long-name-a-long-name-a-long-n-68327 - helm_release_name: resources-first-pipeline-a-long-name-a-long-nam-68327 name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name namespace: example-namespace prefix: resources-first-pipeline- diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml new file mode 100644 index 000000000..2787d7444 --- /dev/null +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml @@ -0,0 +1,244 @@ +- _cleaner: + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: producer-app-cleaner + version: 2.4.2 + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: + com/bakdata/kafka/fake: 1.0.0 + topics: + resources-pipeline-folders-pipeline-1-scheduled-producer: + configs: + cleanup.policy: compact,delete + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + resources-pipeline-folders-pipeline-2-converter: + configs: + cleanup.policy: compact,delete + retention.ms: '-1' + partitions_count: 50 + type: output + resources-pipeline-folders-pipeline-2-converter-error: + configs: + cleanup.policy: compact,delete + partitions_count: 10 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: converter + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 + type: output + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter + version: 2.4.2 + diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml new file mode 100644 index 000000000..2787d7444 --- /dev/null +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml @@ -0,0 +1,244 @@ +- _cleaner: + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: producer-app-cleaner + version: 2.4.2 + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: + com/bakdata/kafka/fake: 1.0.0 + topics: + resources-pipeline-folders-pipeline-1-scheduled-producer: + configs: + cleanup.policy: compact,delete + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + resources-pipeline-folders-pipeline-2-converter: + configs: + cleanup.policy: compact,delete + retention.ms: '-1' + partitions_count: 50 + type: output + resources-pipeline-folders-pipeline-2-converter-error: + configs: + cleanup.policy: compact,delete + partitions_count: 10 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: converter + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 + type: output + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter + version: 2.4.2 + diff --git a/tests/pipeline/snapshots/test_generate/test_model_serialization/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_model_serialization/pipeline.yaml index 004d157b0..02e06ff34 100644 --- a/tests/pipeline/snapshots/test_generate/test_model_serialization/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_model_serialization/pipeline.yaml @@ -4,8 +4,6 @@ brokers: test outputTopic: out schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-paths-account-producer-clean - helm_release_name: resources-pipeline-with-paths-account-producer-clean name: account-producer namespace: test prefix: resources-pipeline-with-paths- @@ -25,8 +23,6 @@ brokers: test outputTopic: out schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-paths-account-producer - helm_release_name: resources-pipeline-with-paths-account-producer name: account-producer namespace: test prefix: resources-pipeline-with-paths- diff --git a/tests/pipeline/snapshots/test_generate/test_no_input_topic/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_no_input_topic/pipeline.yaml index ff490db8b..a77aea9be 100644 --- a/tests/pipeline/snapshots/test_generate/test_no_input_topic/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_no_input_topic/pipeline.yaml @@ -18,8 +18,6 @@ inputPattern: .* outputTopic: example-output schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-no-input-topic-pipeline-app1-clean - helm_release_name: resources-no-input-topic-pipeline-app1-clean name: app1 namespace: example-namespace prefix: resources-no-input-topic-pipeline- @@ -55,8 +53,6 @@ topics: .*: type: pattern - helm_name_override: resources-no-input-topic-pipeline-app1 - helm_release_name: resources-no-input-topic-pipeline-app1 name: app1 namespace: example-namespace prefix: resources-no-input-topic-pipeline- @@ -95,8 +91,6 @@ inputTopics: - example-output schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-no-input-topic-pipeline-app2-clean - helm_release_name: resources-no-input-topic-pipeline-app2-clean name: app2 namespace: example-namespace prefix: resources-no-input-topic-pipeline- @@ -123,8 +117,6 @@ inputTopics: - example-output schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-no-input-topic-pipeline-app2 - helm_release_name: resources-no-input-topic-pipeline-app2 name: app2 namespace: example-namespace prefix: resources-no-input-topic-pipeline- diff --git a/tests/pipeline/snapshots/test_generate/test_no_user_defined_components/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_no_user_defined_components/pipeline.yaml index 2d220a5f0..d8850383e 100644 --- a/tests/pipeline/snapshots/test_generate/test_no_user_defined_components/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_no_user_defined_components/pipeline.yaml @@ -13,8 +13,6 @@ - example-topic outputTopic: example-output schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-no-user-defined-components-streams-app-clean - helm_release_name: resources-no-user-defined-components-stre-2d151-clean name: streams-app namespace: example-namespace prefix: resources-no-user-defined-components- @@ -45,8 +43,6 @@ topics: example-topic: type: input - helm_name_override: resources-no-user-defined-components-streams-app - helm_release_name: resources-no-user-defined-components-streams-app name: streams-app namespace: example-namespace prefix: resources-no-user-defined-components- diff --git a/tests/pipeline/snapshots/test_generate/test_pipelines_with_envs/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_pipelines_with_envs/pipeline.yaml index dec025bbf..344e9c5b1 100644 --- a/tests/pipeline/snapshots/test_generate/test_pipelines_with_envs/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_pipelines_with_envs/pipeline.yaml @@ -9,8 +9,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-pipeline-with-envs-input-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-envs-input-producer-clean - helm_release_name: resources-pipeline-with-envs-input-producer-clean name: input-producer namespace: example-namespace prefix: resources-pipeline-with-envs- @@ -32,8 +30,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-pipeline-with-envs-input-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-envs-input-producer - helm_release_name: resources-pipeline-with-envs-input-producer name: input-producer namespace: example-namespace prefix: resources-pipeline-with-envs- @@ -85,8 +81,6 @@ - resources-pipeline-with-envs-input-producer outputTopic: resources-pipeline-with-envs-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-envs-converter-clean - helm_release_name: resources-pipeline-with-envs-converter-clean name: converter namespace: example-namespace prefix: resources-pipeline-with-envs- @@ -128,8 +122,6 @@ - resources-pipeline-with-envs-input-producer outputTopic: resources-pipeline-with-envs-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-envs-converter - helm_release_name: resources-pipeline-with-envs-converter name: converter namespace: example-namespace prefix: resources-pipeline-with-envs- @@ -188,8 +180,6 @@ - resources-pipeline-with-envs-converter outputTopic: resources-pipeline-with-envs-filter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-envs-filter-clean - helm_release_name: resources-pipeline-with-envs-filter-clean name: filter namespace: example-namespace prefix: resources-pipeline-with-envs- @@ -233,8 +223,6 @@ - resources-pipeline-with-envs-converter outputTopic: resources-pipeline-with-envs-filter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-envs-filter - helm_release_name: resources-pipeline-with-envs-filter name: filter namespace: example-namespace prefix: resources-pipeline-with-envs- diff --git a/tests/pipeline/snapshots/test_generate/test_prefix_pipeline_component/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_prefix_pipeline_component/pipeline.yaml index ca1709bcd..096b846d1 100644 --- a/tests/pipeline/snapshots/test_generate/test_prefix_pipeline_component/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_prefix_pipeline_component/pipeline.yaml @@ -12,8 +12,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 schemaRegistryUrl: http://localhost:8081/ suspend: true - helm_name_override: from-pipeline-component-account-producer-clean - helm_release_name: from-pipeline-component-account-producer-clean name: account-producer namespace: ${NAMESPACE} prefix: from-pipeline-component- @@ -38,8 +36,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 schemaRegistryUrl: http://localhost:8081/ suspend: true - helm_name_override: from-pipeline-component-account-producer - helm_release_name: from-pipeline-component-account-producer name: account-producer namespace: ${NAMESPACE} prefix: from-pipeline-component- diff --git a/tests/pipeline/snapshots/test_generate/test_read_from_component/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_read_from_component/pipeline.yaml index 319418e8b..761f21e63 100644 --- a/tests/pipeline/snapshots/test_generate/test_read_from_component/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_read_from_component/pipeline.yaml @@ -4,8 +4,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-read-from-component-producer1 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-producer1-clean - helm_release_name: resources-read-from-component-producer1-clean name: producer1 namespace: example-namespace prefix: resources-read-from-component- @@ -22,8 +20,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-read-from-component-producer1 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-producer1 - helm_release_name: resources-read-from-component-producer1 name: producer1 namespace: example-namespace prefix: resources-read-from-component- @@ -46,8 +42,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-read-from-component-producer2 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: producer2-clean - helm_release_name: producer2-clean name: producer2 namespace: example-namespace prefix: '' @@ -64,8 +58,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-read-from-component-producer2 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: producer2 - helm_release_name: producer2 name: producer2 namespace: example-namespace prefix: '' @@ -109,8 +101,6 @@ - resources-read-from-component-producer2 outputTopic: resources-read-from-component-inflate-step schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-inflate-step-clean - helm_release_name: resources-read-from-component-inflate-step-clean name: inflate-step namespace: example-namespace prefix: resources-read-from-component- @@ -148,8 +138,6 @@ - resources-read-from-component-producer2 outputTopic: resources-read-from-component-inflate-step schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-inflate-step - helm_release_name: resources-read-from-component-inflate-step name: inflate-step namespace: example-namespace prefix: resources-read-from-component- @@ -180,8 +168,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: resources-read-from-component-inflate-step-inflated-sink-connector connectorType: sink - helm_name_override: resources-read-from-component-inflate-step-inflated-f12dd-clean - helm_release_name: resources-read-from-component-inflate-ste-f12dd-clean name: inflate-step-inflated-sink-connector namespace: example-namespace prefix: resources-read-from-component- @@ -234,8 +220,6 @@ - kafka-sink-connector outputTopic: resources-read-from-component-inflate-step-inflate-step-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-inflate-step-inflated-51f5c-clean - helm_release_name: resources-read-from-component-inflate-ste-51f5c-clean name: inflate-step-inflated-streams-app namespace: example-namespace prefix: resources-read-from-component- @@ -260,8 +244,6 @@ - kafka-sink-connector outputTopic: resources-read-from-component-inflate-step-inflate-step-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-inflate-step-inflated-streams-app - helm_release_name: resources-read-from-component-inflate-step-infl-5def2 name: inflate-step-inflated-streams-app namespace: example-namespace prefix: resources-read-from-component- @@ -311,8 +293,6 @@ - resources-read-from-component-inflate-step-inflate-step-inflated-streams-app outputTopic: resources-read-from-component-inflate-step-without-prefix schemaRegistryUrl: http://localhost:8081/ - helm_name_override: inflate-step-without-prefix-clean - helm_release_name: inflate-step-without-prefix-clean name: inflate-step-without-prefix namespace: example-namespace prefix: '' @@ -350,8 +330,6 @@ - resources-read-from-component-inflate-step-inflate-step-inflated-streams-app outputTopic: resources-read-from-component-inflate-step-without-prefix schemaRegistryUrl: http://localhost:8081/ - helm_name_override: inflate-step-without-prefix - helm_release_name: inflate-step-without-prefix name: inflate-step-without-prefix namespace: example-namespace prefix: '' @@ -382,8 +360,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: resources-read-from-component-inflate-step-without-prefix-inflated-sink-connector connectorType: sink - helm_name_override: resources-read-from-component-inflate-step-without--08c44-clean - helm_release_name: resources-read-from-component-inflate-ste-08c44-clean name: inflate-step-without-prefix-inflated-sink-connector namespace: example-namespace prefix: resources-read-from-component- @@ -436,8 +412,6 @@ - kafka-sink-connector outputTopic: inflate-step-without-prefix-inflate-step-without-prefix-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-inflate-step-without--53a68-clean - helm_release_name: resources-read-from-component-inflate-ste-53a68-clean name: inflate-step-without-prefix-inflated-streams-app namespace: example-namespace prefix: resources-read-from-component- @@ -462,8 +436,6 @@ - kafka-sink-connector outputTopic: inflate-step-without-prefix-inflate-step-without-prefix-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-inflate-step-without-prefix-000dc - helm_release_name: resources-read-from-component-inflate-step-with-000dc name: inflate-step-without-prefix-inflated-streams-app namespace: example-namespace prefix: resources-read-from-component- @@ -500,8 +472,6 @@ - resources-read-from-component-producer1 outputTopic: resources-read-from-component-consumer1 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-consumer1-clean - helm_release_name: resources-read-from-component-consumer1-clean name: consumer1 namespace: example-namespace prefix: resources-read-from-component- @@ -531,8 +501,6 @@ producer1: type: input topics: {} - helm_name_override: resources-read-from-component-consumer1 - helm_release_name: resources-read-from-component-consumer1 name: consumer1 namespace: example-namespace prefix: resources-read-from-component- @@ -569,8 +537,6 @@ - resources-read-from-component-producer1 - resources-read-from-component-consumer1 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-consumer2-clean - helm_release_name: resources-read-from-component-consumer2-clean name: consumer2 namespace: example-namespace prefix: resources-read-from-component- @@ -602,8 +568,6 @@ producer1: type: input topics: {} - helm_name_override: resources-read-from-component-consumer2 - helm_release_name: resources-read-from-component-consumer2 name: consumer2 namespace: example-namespace prefix: resources-read-from-component- @@ -637,8 +601,6 @@ - resources-read-from-component-producer1 - resources-read-from-component-producer2 schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-consumer3-clean - helm_release_name: resources-read-from-component-consumer3-clean name: consumer3 namespace: example-namespace prefix: resources-read-from-component- @@ -670,8 +632,6 @@ topics: resources-read-from-component-producer1: type: input - helm_name_override: resources-read-from-component-consumer3 - helm_release_name: resources-read-from-component-consumer3 name: consumer3 namespace: example-namespace prefix: resources-read-from-component- @@ -704,8 +664,6 @@ inputTopics: - resources-read-from-component-inflate-step-inflate-step-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-consumer4-clean - helm_release_name: resources-read-from-component-consumer4-clean name: consumer4 namespace: example-namespace prefix: resources-read-from-component- @@ -734,8 +692,6 @@ inflate-step: type: input topics: {} - helm_name_override: resources-read-from-component-consumer4 - helm_release_name: resources-read-from-component-consumer4 name: consumer4 namespace: example-namespace prefix: resources-read-from-component- @@ -768,8 +724,6 @@ inputTopics: - inflate-step-without-prefix-inflate-step-without-prefix-inflated-streams-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-read-from-component-consumer5-clean - helm_release_name: resources-read-from-component-consumer5-clean name: consumer5 namespace: example-namespace prefix: resources-read-from-component- @@ -798,8 +752,6 @@ inflate-step-without-prefix: type: input topics: {} - helm_name_override: resources-read-from-component-consumer5 - helm_release_name: resources-read-from-component-consumer5 name: consumer5 namespace: example-namespace prefix: resources-read-from-component- diff --git a/tests/pipeline/snapshots/test_generate/test_substitute_in_component/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_substitute_in_component/pipeline.yaml index aeee057b0..8ca686f30 100644 --- a/tests/pipeline/snapshots/test_generate/test_substitute_in_component/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_substitute_in_component/pipeline.yaml @@ -8,15 +8,11 @@ app_name: scheduled-producer app_schedule: 30 3/8 * * * app_type: scheduled-producer - helm_name_override: resources-component-type-substitution-scheduled-producer - helm_release_name: resources-component-type-substitution-scheduled-producer schedule: 30 3/8 * * * streams: brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-component-type-substitution-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-component-type-substitution-scheduled-producer-clean - helm_release_name: resources-component-type-substitution-sch-01b02-clean name: scheduled-producer namespace: example-namespace prefix: resources-component-type-substitution- @@ -37,15 +33,11 @@ app_name: scheduled-producer app_schedule: 30 3/8 * * * app_type: scheduled-producer - helm_name_override: resources-component-type-substitution-scheduled-producer - helm_release_name: resources-component-type-substitution-scheduled-producer schedule: 30 3/8 * * * streams: brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: resources-component-type-substitution-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-component-type-substitution-scheduled-producer - helm_release_name: resources-component-type-substitution-scheduled-b0010 name: scheduled-producer namespace: example-namespace prefix: resources-component-type-substitution- @@ -97,8 +89,6 @@ - resources-component-type-substitution-scheduled-producer outputTopic: resources-component-type-substitution-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-component-type-substitution-converter-clean - helm_release_name: resources-component-type-substitution-converter-clean name: converter namespace: example-namespace prefix: resources-component-type-substitution- @@ -140,8 +130,6 @@ - resources-component-type-substitution-scheduled-producer outputTopic: resources-component-type-substitution-converter schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-component-type-substitution-converter - helm_release_name: resources-component-type-substitution-converter name: converter namespace: example-namespace prefix: resources-component-type-substitution- @@ -206,8 +194,6 @@ - resources-component-type-substitution-converter outputTopic: resources-component-type-substitution-filter-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-component-type-substitution-filter-app-clean - helm_release_name: resources-component-type-substitution-fil-d68ac-clean name: filter-app namespace: example-namespace prefix: resources-component-type-substitution- @@ -257,8 +243,6 @@ - resources-component-type-substitution-converter outputTopic: resources-component-type-substitution-filter-app schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-component-type-substitution-filter-app - helm_release_name: resources-component-type-substitution-filter-app name: filter-app namespace: example-namespace prefix: resources-component-type-substitution- diff --git a/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_absolute_defaults_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_absolute_defaults_path/pipeline.yaml index bd29f273a..f78e8f0d1 100644 --- a/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_absolute_defaults_path/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_absolute_defaults_path/pipeline.yaml @@ -9,8 +9,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: app1-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app1-clean - helm_release_name: resources-custom-config-app1-clean name: app1 namespace: development-namespace prefix: resources-custom-config- @@ -32,8 +30,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: app1-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app1 - helm_release_name: resources-custom-config-app1 name: app1 namespace: development-namespace prefix: resources-custom-config- @@ -68,8 +64,6 @@ - app1-test-topic outputTopic: app2-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app2-clean - helm_release_name: resources-custom-config-app2-clean name: app2 namespace: development-namespace prefix: resources-custom-config- @@ -97,8 +91,6 @@ - app1-test-topic outputTopic: app2-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app2 - helm_release_name: resources-custom-config-app2 name: app2 namespace: development-namespace prefix: resources-custom-config- diff --git a/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_relative_defaults_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_relative_defaults_path/pipeline.yaml index bd29f273a..f78e8f0d1 100644 --- a/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_relative_defaults_path/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_with_custom_config_with_relative_defaults_path/pipeline.yaml @@ -9,8 +9,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: app1-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app1-clean - helm_release_name: resources-custom-config-app1-clean name: app1 namespace: development-namespace prefix: resources-custom-config- @@ -32,8 +30,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 outputTopic: app1-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app1 - helm_release_name: resources-custom-config-app1 name: app1 namespace: development-namespace prefix: resources-custom-config- @@ -68,8 +64,6 @@ - app1-test-topic outputTopic: app2-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app2-clean - helm_release_name: resources-custom-config-app2-clean name: app2 namespace: development-namespace prefix: resources-custom-config- @@ -97,8 +91,6 @@ - app1-test-topic outputTopic: app2-test-topic schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-custom-config-app2 - helm_release_name: resources-custom-config-app2 name: app2 namespace: development-namespace prefix: resources-custom-config- diff --git a/tests/pipeline/snapshots/test_generate/test_with_env_defaults/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_with_env_defaults/pipeline.yaml index 59f556c41..cea0b2660 100644 --- a/tests/pipeline/snapshots/test_generate/test_with_env_defaults/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_with_env_defaults/pipeline.yaml @@ -13,8 +13,6 @@ - example-topic outputTopic: example-output schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-with-env-defaults-streams-app-de-5b336-clean - helm_release_name: resources-pipeline-with-env-defaults-stre-5b336-clean name: streams-app-development namespace: development-namespace prefix: resources-pipeline-with-env-defaults- @@ -45,8 +43,6 @@ topics: example-topic: type: input - helm_name_override: resources-pipeline-with-env-defaults-streams-app-development - helm_release_name: resources-pipeline-with-env-defaults-streams-ap-49439 name: streams-app-development namespace: development-namespace prefix: resources-pipeline-with-env-defaults- @@ -75,8 +71,6 @@ brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 connector: resources-pipeline-with-env-defaults-es-sink-connector connectorType: sink - helm_name_override: resources-pipeline-with-env-defaults-es-sink-connector-clean - helm_release_name: resources-pipeline-with-env-defaults-es-s-d5f87-clean name: es-sink-connector namespace: development-namespace prefix: resources-pipeline-with-env-defaults- diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 8b237f705..915ae17d7 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -10,9 +10,12 @@ from typer.testing import CliRunner import kpops +from kpops.api.exception import ParsingException, ValidationError +from kpops.api.file_type import KpopsFileType from kpops.cli.main import FilterType, app from kpops.components import KafkaSinkConnector, PipelineComponent -from kpops.pipeline import ParsingException, ValidationError + +PIPELINE_YAML = KpopsFileType.PIPELINE.as_yaml_file() runner = CliRunner() @@ -23,12 +26,11 @@ class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.cli.main.log.info") + return mocker.patch("kpops.api.log.info") def test_python_api(self): pipeline = kpops.generate( - RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, + RESOURCE_PATH / "first-pipeline" / PIPELINE_YAML, ) assert len(pipeline) == 3 assert [component.type for component in pipeline.components] == [ @@ -39,26 +41,26 @@ def test_python_api(self): def test_python_api_filter_include(self, log_info: MagicMock): pipeline = kpops.generate( - RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, - steps="converter", + RESOURCE_PATH / "first-pipeline" / PIPELINE_YAML, + steps={"converter"}, filter_type=FilterType.INCLUDE, ) assert len(pipeline) == 1 assert pipeline.components[0].type == "converter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call("Filtered pipeline:\n['converter']") def test_python_api_filter_exclude(self, log_info: MagicMock): pipeline = kpops.generate( - RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, - steps="converter,scheduled-producer", + RESOURCE_PATH / "first-pipeline" / PIPELINE_YAML, + steps={"converter", "scheduled-producer"}, filter_type=FilterType.EXCLUDE, ) assert len(pipeline) == 1 assert pipeline.components[0].type == "filter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call( "Filtered pipeline:\n['a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name']" ) @@ -68,7 +70,21 @@ def test_load_pipeline(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "first-pipeline/pipeline.yaml"), + str(RESOURCE_PATH / "first-pipeline" / PIPELINE_YAML), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + snapshot.assert_match(result.stdout, PIPELINE_YAML) + + def test_load_pipeline_with_folder_path(self, snapshot: Snapshot): + result = runner.invoke( + app, + [ + "generate", + str(RESOURCE_PATH / "pipeline-folders"), ], catch_exceptions=False, ) @@ -77,12 +93,26 @@ def test_load_pipeline(self, snapshot: Snapshot): snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_load_pipeline_with_multiple_pipeline_paths(self, snapshot: Snapshot): + path_1 = RESOURCE_PATH / "pipeline-folders/pipeline-1/pipeline.yaml" + path_2 = RESOURCE_PATH / "pipeline-folders/pipeline-2/pipeline.yaml" + path_3 = RESOURCE_PATH / "pipeline-folders/pipeline-3/pipeline.yaml" + result = runner.invoke( + app, + ["generate", str(path_1), str(path_2), str(path_3)], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_name_equal_prefix_name_concatenation(self): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "name_prefix_concatenation/pipeline.yaml"), + str(RESOURCE_PATH / "name_prefix_concatenation" / PIPELINE_YAML), ], catch_exceptions=False, ) @@ -99,7 +129,7 @@ def test_pipelines_with_envs(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "pipeline-with-envs/pipeline.yaml"), + str(RESOURCE_PATH / "pipeline-with-envs" / PIPELINE_YAML), "--environment", "development", ], @@ -108,28 +138,28 @@ def test_pipelines_with_envs(self, snapshot: Snapshot): assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_inflate_pipeline(self, snapshot: Snapshot): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "pipeline-with-inflate/pipeline.yaml"), + str(RESOURCE_PATH / "pipeline-with-inflate" / PIPELINE_YAML), ], catch_exceptions=False, ) assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_substitute_in_component(self, snapshot: Snapshot): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "component-type-substitution/pipeline.yaml"), + str(RESOURCE_PATH / "component-type-substitution" / PIPELINE_YAML), ], catch_exceptions=False, ) @@ -163,7 +193,7 @@ def test_substitute_in_component(self, snapshot: Snapshot): == "filter-app-filter" ) - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) @pytest.mark.timeout(2) def test_substitute_in_component_infinite_loop(self): @@ -174,7 +204,8 @@ def test_substitute_in_component_infinite_loop(self): "generate", str( RESOURCE_PATH - / "component-type-substitution/infinite_pipeline.yaml", + / "component-type-substitution" + / KpopsFileType.PIPELINE.as_yaml_file(prefix="infinite_"), ), ], catch_exceptions=False, @@ -185,7 +216,7 @@ def test_kafka_connector_config_parsing(self): app, [ "generate", - str(RESOURCE_PATH / "kafka-connect-sink-config/pipeline.yaml"), + str(RESOURCE_PATH / "kafka-connect-sink-config" / PIPELINE_YAML), "--config", str(RESOURCE_PATH / "kafka-connect-sink-config"), ], @@ -203,28 +234,28 @@ def test_no_input_topic(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "no-input-topic-pipeline/pipeline.yaml"), + str(RESOURCE_PATH / "no-input-topic-pipeline" / PIPELINE_YAML), ], catch_exceptions=False, ) assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_no_user_defined_components(self, snapshot: Snapshot): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "no-user-defined-components/pipeline.yaml"), + str(RESOURCE_PATH / "no-user-defined-components" / PIPELINE_YAML), ], catch_exceptions=False, ) assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_kafka_connect_sink_weave_from_topics(self, snapshot: Snapshot): """Parse Connector topics from previous component to section.""" @@ -232,35 +263,35 @@ def test_kafka_connect_sink_weave_from_topics(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "kafka-connect-sink/pipeline.yaml"), + str(RESOURCE_PATH / "kafka-connect-sink" / PIPELINE_YAML), ], catch_exceptions=False, ) assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_read_from_component(self, snapshot: Snapshot): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "read-from-component/pipeline.yaml"), + str(RESOURCE_PATH / "read-from-component" / PIPELINE_YAML), ], catch_exceptions=False, ) assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_with_env_defaults(self, snapshot: Snapshot): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "pipeline-with-env-defaults/pipeline.yaml"), + str(RESOURCE_PATH / "pipeline-with-env-defaults" / PIPELINE_YAML), "--environment", "development", ], @@ -269,7 +300,7 @@ def test_with_env_defaults(self, snapshot: Snapshot): assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_prefix_pipeline_component(self, snapshot: Snapshot): result = runner.invoke( @@ -278,7 +309,8 @@ def test_prefix_pipeline_component(self, snapshot: Snapshot): "generate", str( RESOURCE_PATH - / "pipeline-component-should-have-prefix/pipeline.yaml", + / "pipeline-component-should-have-prefix" + / PIPELINE_YAML, ), ], catch_exceptions=False, @@ -286,7 +318,7 @@ def test_prefix_pipeline_component(self, snapshot: Snapshot): assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_with_custom_config_with_relative_defaults_path( self, @@ -296,7 +328,7 @@ def test_with_custom_config_with_relative_defaults_path( app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", str(RESOURCE_PATH / "custom-config"), "--environment", @@ -318,7 +350,7 @@ def test_with_custom_config_with_relative_defaults_path( error_topic = streams_app_details["app"]["streams"]["errorTopic"] assert error_topic == "app2-dead-letter-topic" - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_with_custom_config_with_absolute_defaults_path( self, @@ -339,7 +371,7 @@ def test_with_custom_config_with_absolute_defaults_path( app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", str(temp_config_path.parent), "--environment", @@ -361,7 +393,7 @@ def test_with_custom_config_with_absolute_defaults_path( error_topic = streams_app_details["app"]["streams"]["errorTopic"] assert error_topic == "app2-dead-letter-topic" - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) finally: temp_config_path.unlink() @@ -370,7 +402,7 @@ def test_default_config(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--environment", "development", ], @@ -390,7 +422,7 @@ def test_default_config(self, snapshot: Snapshot): error_topic = streams_app_details["app"]["streams"]["errorTopic"] assert error_topic == "resources-custom-config-app2-error" - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_env_vars_precedence_over_config(self, monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv(name="KPOPS_KAFKA_BROKERS", value="env_broker") @@ -399,7 +431,7 @@ def test_env_vars_precedence_over_config(self, monkeypatch: pytest.MonkeyPatch): app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", str(RESOURCE_PATH / "custom-config"), "--environment", @@ -420,7 +452,7 @@ def test_nested_config_env_vars(self, monkeypatch: pytest.MonkeyPatch): app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", str(RESOURCE_PATH / "custom-config"), "--environment", @@ -444,7 +476,7 @@ def test_env_specific_config_env_def_in_env_var( app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", config_path, ], @@ -476,7 +508,7 @@ def test_env_specific_config_env_def_in_cli( app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", config_path, "--environment", @@ -495,7 +527,7 @@ def test_config_dir_doesnt_exist(self): app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", "./non-existent-dir", "--environment", @@ -511,21 +543,21 @@ def test_model_serialization(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "pipeline-with-paths/pipeline.yaml"), + str(RESOURCE_PATH / "pipeline-with-paths" / PIPELINE_YAML), ], catch_exceptions=False, ) assert result.exit_code == 0, result.stdout - snapshot.assert_match(result.stdout, "pipeline.yaml") + snapshot.assert_match(result.stdout, PIPELINE_YAML) def test_dotenv_support(self): result = runner.invoke( app, [ "generate", - str(RESOURCE_PATH / "custom-config/pipeline.yaml"), + str(RESOURCE_PATH / "custom-config" / PIPELINE_YAML), "--config", str(RESOURCE_PATH / "dotenv"), "--dotenv", @@ -548,7 +580,7 @@ def test_short_topic_definition(self): app, [ "generate", - str(RESOURCE_PATH / "pipeline-with-short-topics/pipeline.yaml"), + str(RESOURCE_PATH / "pipeline-with-short-topics" / PIPELINE_YAML), ], catch_exceptions=False, ) @@ -602,7 +634,8 @@ def test_kubernetes_app_name_validation(self): "generate", str( RESOURCE_PATH - / "pipeline-with-illegal-kubernetes-name/pipeline.yaml", + / "pipeline-with-illegal-kubernetes-name" + / PIPELINE_YAML, ), ], catch_exceptions=False, @@ -623,7 +656,9 @@ def test_validate_unique_step_names(self): app, [ "generate", - str(RESOURCE_PATH / "pipeline-duplicate-step-names/pipeline.yaml"), + str( + RESOURCE_PATH / "pipeline-duplicate-step-names" / PIPELINE_YAML + ), ], catch_exceptions=False, ) @@ -634,14 +669,14 @@ def test_validate_loops_on_pipeline(self): app, [ "generate", - str(RESOURCE_PATH / "pipeline-with-loop/pipeline.yaml"), + str(RESOURCE_PATH / "pipeline-with-loop" / PIPELINE_YAML), ], catch_exceptions=False, ) def test_validate_simple_graph(self): pipeline = kpops.generate( - RESOURCE_PATH / "pipelines-with-graphs/simple-pipeline/pipeline.yaml", + RESOURCE_PATH / "pipelines-with-graphs" / "simple-pipeline" / PIPELINE_YAML, ) assert len(pipeline.components) == 2 assert len(pipeline._graph.nodes) == 3 @@ -655,7 +690,9 @@ def test_validate_simple_graph(self): def test_validate_topic_and_component_same_name(self): pipeline = kpops.generate( RESOURCE_PATH - / "pipelines-with-graphs/same-topic-and-component-name/pipeline.yaml", + / "pipelines-with-graphs" + / "same-topic-and-component-name" + / PIPELINE_YAML, ) component, topic = list(pipeline._graph.nodes) edges = list(pipeline._graph.edges) @@ -665,7 +702,7 @@ def test_validate_topic_and_component_same_name(self): @pytest.mark.asyncio() async def test_parallel_execution_graph(self): pipeline = kpops.generate( - RESOURCE_PATH / "parallel-pipeline/pipeline.yaml", + RESOURCE_PATH / "parallel-pipeline" / PIPELINE_YAML, config=RESOURCE_PATH / "parallel-pipeline", ) @@ -706,7 +743,7 @@ async def name_runner(component: PipelineComponent): @pytest.mark.asyncio() async def test_subgraph_execution(self): pipeline = kpops.generate( - RESOURCE_PATH / "parallel-pipeline/pipeline.yaml", + RESOURCE_PATH / "parallel-pipeline" / PIPELINE_YAML, config=RESOURCE_PATH / "parallel-pipeline", ) @@ -734,7 +771,7 @@ async def name_runner(component: PipelineComponent): @pytest.mark.asyncio() async def test_parallel_execution_graph_reverse(self): pipeline = kpops.generate( - RESOURCE_PATH / "parallel-pipeline/pipeline.yaml", + RESOURCE_PATH / "parallel-pipeline" / PIPELINE_YAML, config=RESOURCE_PATH / "parallel-pipeline", ) @@ -777,7 +814,7 @@ def test_temp_trim_release_name(self): app, [ "generate", - str(RESOURCE_PATH / "temp-trim-release-name/pipeline.yaml"), + str(RESOURCE_PATH / "temp-trim-release-name" / PIPELINE_YAML), ], catch_exceptions=False, ) @@ -793,7 +830,7 @@ def test_substitution_in_inflated_component(self): app, [ "generate", - str(RESOURCE_PATH / "resetter_values/pipeline.yaml"), + str(RESOURCE_PATH / "resetter_values" / PIPELINE_YAML), ], catch_exceptions=False, ) @@ -806,7 +843,9 @@ def test_substitution_in_inflated_component(self): def test_substitution_in_resetter(self): pipeline = kpops.generate( - RESOURCE_PATH / "resetter_values/pipeline_connector_only.yaml", + RESOURCE_PATH + / "resetter_values" + / KpopsFileType.PIPELINE.as_yaml_file(suffix="_connector_only"), ) assert isinstance(pipeline.components[0], KafkaSinkConnector) assert pipeline.components[0].name == "es-sink-connector" diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index c144a32c9..445e528cc 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -115,7 +115,6 @@ def test_manifest_command(self, snapshot: Snapshot): def test_python_api(self, snapshot: Snapshot): resources = kpops.manifest( RESOURCE_PATH / "custom-config/pipeline.yaml", - output=False, environment="development", ) assert isinstance(resources, list) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index d0c53667b..b5c741ad0 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,8 +3,7 @@ import pytest from polyfactory.factories.pydantic_factory import ModelFactory -from kpops.cli.main import create_default_step_names_filter_predicate -from kpops.cli.options import FilterType +from kpops.api.options import FilterType from kpops.component_handlers import ( ComponentHandlers, ) @@ -44,8 +43,8 @@ def pipeline(self) -> Pipeline: return pipeline def test_filter_include(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - {"example2", "example3"}, FilterType.INCLUDE + predicate = FilterType.INCLUDE.create_default_step_names_filter_predicate( + {"example2", "example3"} ) pipeline.filter(predicate) assert len(pipeline.components) == 2 @@ -53,23 +52,19 @@ def test_filter_include(self, pipeline: Pipeline): assert test_component_3 in pipeline.components def test_filter_include_empty(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - set(), FilterType.INCLUDE - ) + predicate = FilterType.INCLUDE.create_default_step_names_filter_predicate(set()) pipeline.filter(predicate) assert len(pipeline.components) == 0 def test_filter_exclude(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - {"example2", "example3"}, FilterType.EXCLUDE + predicate = FilterType.EXCLUDE.create_default_step_names_filter_predicate( + {"example2", "example3"} ) pipeline.filter(predicate) assert len(pipeline.components) == 1 assert test_component_1 in pipeline.components def test_filter_exclude_empty(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - set(), FilterType.EXCLUDE - ) + predicate = FilterType.EXCLUDE.create_default_step_names_filter_predicate(set()) pipeline.filter(predicate) assert len(pipeline.components) == 3 diff --git a/tests/cli/test_kpops_config.py b/tests/test_kpops_config.py similarity index 97% rename from tests/cli/test_kpops_config.py rename to tests/test_kpops_config.py index 1af05c626..dd38ed345 100644 --- a/tests/cli/test_kpops_config.py +++ b/tests/test_kpops_config.py @@ -12,7 +12,6 @@ def test_kpops_config_with_default_values(): default_config = KpopsConfig(kafka_brokers="http://broker:9092") - assert default_config.defaults_filename_prefix == "defaults" assert ( default_config.topic_name_config.default_output_topic_name == "${pipeline.name}-${component.name}"