diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index 27ea03649..5c74244b5 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -20,7 +20,7 @@ from pydantic.json_schema import SkipJsonSchema from kpops.component_handlers import ComponentHandlers -from kpops.config import KpopsConfig +from kpops.config import KpopsConfig, get_config from kpops.const.file_type import KpopsFileType from kpops.utils import cached_classproperty from kpops.utils.dict_ops import ( @@ -129,12 +129,13 @@ def substitute_in_component(cls, **component_data: Any) -> dict[str, Any]: :param component_as_dict: Component represented as dict :return: Updated component """ + config = get_config() # Leftover variables that were previously introduced in the component by the substitution # functions, still hardcoded, because of their names. # TODO(Ivan Yordanov): Get rid of them substitution_hardcoded: dict[str, JsonType] = { - "error_topic_name": KpopsConfig.topic_name_config.default_error_topic_name, - "output_topic_name": KpopsConfig.topic_name_config.default_output_topic_name, + "error_topic_name": config.topic_name_config.default_error_topic_name, + "output_topic_name": config.topic_name_config.default_output_topic_name, } component_substitution = generate_substitution( component_data, @@ -143,7 +144,7 @@ def substitute_in_component(cls, **component_data: Any) -> dict[str, Any]: separator=".", ) substitution = generate_substitution( - KpopsConfig.model_dump(mode="json"), + config.model_dump(mode="json"), "config", existing_substitution=component_substitution, separator=".", @@ -164,6 +165,7 @@ def extend_with_defaults(cls, **kwargs: Any) -> dict[str, Any]: :param kwargs: The init kwargs for pydantic :returns: Enriched kwargs with inherited defaults """ + config = get_config() pipeline_path_str = ENV.get(PIPELINE_PATH) if not pipeline_path_str: return kwargs @@ -175,7 +177,7 @@ def extend_with_defaults(cls, **kwargs: Any) -> dict[str, Any]: kwargs[k] = asdict(v) defaults_file_paths_ = get_defaults_file_paths( - pipeline_path, ENV.get("environment") + pipeline_path, config, ENV.get("environment") ) defaults = cls.load_defaults(*defaults_file_paths_) log.debug( @@ -236,7 +238,9 @@ def defaults_from_yaml(path: Path, key: str) -> dict: return value -def get_defaults_file_paths(pipeline_path: Path, environment: str | None) -> list[Path]: +def get_defaults_file_paths( + pipeline_path: Path, config: KpopsConfig, environment: str | None +) -> list[Path]: """Return a list of default file paths related to the given pipeline. This function traverses the directory hierarchy upwards till the `pipeline_base_dir`, @@ -256,7 +260,7 @@ def get_defaults_file_paths(pipeline_path: Path, environment: str | None) -> lis raise FileNotFoundError(message) path = pipeline_path.resolve() - pipeline_base_dir = KpopsConfig.pipeline_base_dir.resolve() + pipeline_base_dir = config.pipeline_base_dir.resolve() if pipeline_base_dir not in path.parents: message = f"The given pipeline base path {pipeline_base_dir} is not part of the pipeline path {path}" raise RuntimeError(message) diff --git a/kpops/components/base_components/cleaner.py b/kpops/components/base_components/cleaner.py index 99ded5b59..76a75247f 100644 --- a/kpops/components/base_components/cleaner.py +++ b/kpops/components/base_components/cleaner.py @@ -8,7 +8,7 @@ create_helm_release_name, ) from kpops.components.base_components.helm_app import HelmApp -from kpops.config import KpopsConfig +from kpops.config import get_config class Cleaner(HelmApp, ABC): @@ -35,7 +35,7 @@ def helm_name_override(self) -> str: @override def helm_flags(self) -> HelmFlags: return HelmFlags( - create_namespace=KpopsConfig.create_namespace, + create_namespace=get_config().create_namespace, version=self.version, wait=True, wait_for_jobs=True, diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 92936020f..6959c578c 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -27,7 +27,7 @@ KubernetesAppValues, ) from kpops.components.base_components.models.resource import Resource -from kpops.config import KpopsConfig +from kpops.config import get_config from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import exclude_by_name @@ -85,7 +85,7 @@ class HelmApp(KubernetesApp): @cached_property def helm(self) -> Helm: """Helm object that contains component-specific config such as repo.""" - helm = Helm(KpopsConfig.helm_config) + helm = Helm(get_config().helm_config) if self.repo_config is not None: helm.add_repo( self.repo_config.repository_name, @@ -97,11 +97,11 @@ def helm(self) -> Helm: @cached_property def helm_diff(self) -> HelmDiff: """Helm diff object of last and current release of this component.""" - return HelmDiff(KpopsConfig.helm_diff_config) + return HelmDiff(get_config().helm_diff_config) @cached_property def dry_run_handler(self) -> DryRunHandler: - helm_diff = HelmDiff(KpopsConfig.helm_diff_config) + helm_diff = HelmDiff(get_config().helm_diff_config) return DryRunHandler(self.helm, helm_diff, self.namespace) @property @@ -131,7 +131,7 @@ def helm_flags(self) -> HelmFlags: return HelmFlags( **auth_flags, version=self.version, - create_namespace=KpopsConfig.create_namespace, + create_namespace=get_config().create_namespace, ) @property @@ -139,7 +139,7 @@ def template_flags(self) -> HelmTemplateFlags: """Return flags for Helm template command.""" return HelmTemplateFlags( **self.helm_flags.model_dump(), - api_version=KpopsConfig.helm_config.api_version, + api_version=get_config().helm_config.api_version, ) @override diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index ada71a5c8..a3521b79e 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -14,7 +14,7 @@ from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.components.common.streams_bootstrap import StreamsBootstrap -from kpops.config import KpopsConfig +from kpops.config import get_config from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, @@ -114,7 +114,7 @@ async def clean(self, dry_run: bool) -> None: log.info(f"Init cleanup job for {self.helm_release_name}") await self.deploy(dry_run) - if not KpopsConfig.retain_clean_jobs: + if not get_config().retain_clean_jobs: log.info(f"Uninstall cleanup job for {self.helm_release_name}") await self.destroy(dry_run) diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 9b424c220..1e75ac056 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -23,7 +23,7 @@ from kpops.components.base_components.models.from_section import FromTopic from kpops.components.base_components.models.topic import KafkaTopic from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.config import KpopsConfig +from kpops.config import get_config from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr @@ -85,7 +85,7 @@ async def reset(self, dry_run: bool) -> None: ) await self.deploy(dry_run) - if not KpopsConfig.retain_clean_jobs: + if not get_config().retain_clean_jobs: log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) await self.destroy(dry_run) @@ -158,7 +158,7 @@ def _resetter(self) -> KafkaConnectorResetter: connector_type=self._connector_type.value, config=KafkaConnectorResetterConfig( connector=self.full_name, - brokers=KpopsConfig.kafka_brokers, + brokers=get_config().kafka_brokers, ), **self.resetter_values.model_dump(), ), diff --git a/kpops/config/__init__.py b/kpops/config/__init__.py index f6880ddf9..5e0915183 100644 --- a/kpops/config/__init__.py +++ b/kpops/config/__init__.py @@ -1,9 +1,8 @@ from __future__ import annotations import logging -from functools import lru_cache from pathlib import Path -from typing import ClassVar, TypeVar +from typing import ClassVar from pydantic import AnyHttpUrl, Field, PrivateAttr, TypeAdapter from pydantic_settings import ( @@ -17,11 +16,6 @@ from kpops.utils.docstring import describe_object from kpops.utils.pydantic import YamlConfigSettingsSource -try: - from typing import Self # pyright: ignore[reportAttributeAccessIssue] -except ImportError: - from typing_extensions import Self - ENV_PREFIX = "KPOPS_" @@ -78,15 +72,11 @@ class KafkaConnectConfig(BaseSettings): ) -@lru_cache(maxsize=1) -def singleton(cls: type[KpopsConfig]) -> KpopsConfig: - return cls() - - -@singleton class KpopsConfig(BaseSettings): """Global configuration for KPOps project.""" + _config: ClassVar[KpopsConfig] = PrivateAttr() + pipeline_base_dir: Path = Field( default=Path(), description="Base directory to the pipelines (default is current working directory)", @@ -140,13 +130,14 @@ def create( dotenv: list[Path] | None = None, environment: str | None = None, verbose: bool = False, - ) -> Self: + ) -> KpopsConfig: cls.setup_logging_level(verbose) YamlConfigSettingsSource.config_dir = config YamlConfigSettingsSource.environment = environment - return cls( + cls._config = KpopsConfig( _env_file=dotenv # pyright: ignore[reportCallIssue] ) + return cls._config @staticmethod def setup_logging_level(verbose: bool): @@ -175,3 +166,14 @@ def settings_customise_sources( dotenv_settings, file_secret_settings, ) + + +def get_config() -> KpopsConfig: + if KpopsConfig._config is None: + msg = f"{KpopsConfig.__name__} has not been initialized, call {KpopsConfig.__name__}.{KpopsConfig.create.__name__}" + raise RuntimeError(msg) + return KpopsConfig._config + + +def set_config(config: KpopsConfig) -> None: + KpopsConfig._config = config diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 2b43a4144..d4d711e5f 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -11,7 +11,7 @@ BaseDefaultsComponent, get_defaults_file_paths, ) -from kpops.config import KpopsConfig +from kpops.config import KpopsConfig, set_config from kpops.const.file_type import DEFAULTS_YAML, PIPELINE_YAML, KpopsFileType from kpops.pipeline import PIPELINE_PATH from kpops.utils.environment import ENV @@ -49,7 +49,8 @@ class EnvVarTest(BaseDefaultsComponent): @pytest.fixture(autouse=True) def config() -> None: ENV[PIPELINE_PATH] = str(RESOURCES_PATH / "pipeline.yaml") - KpopsConfig(pipeline_base_dir=PIPELINE_BASE_DIR) + config = KpopsConfig(pipeline_base_dir=PIPELINE_BASE_DIR) + set_config(config) @pytest.fixture() diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 2b965c047..1ab97898b 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -10,7 +10,7 @@ from kpops.components.base_components.kafka_connector import ( KafkaConnector, ) -from kpops.config import KpopsConfig, TopicNameConfig +from kpops.config import KpopsConfig, TopicNameConfig, set_config from tests.components import PIPELINE_BASE_DIR CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop" @@ -30,7 +30,7 @@ class TestKafkaConnector: @pytest.fixture(autouse=True) def config(self) -> None: - KpopsConfig( + config = KpopsConfig( topic_name_config=TopicNameConfig( default_error_topic_name="${component.type}-error-topic", default_output_topic_name="${component.type}-output-topic", @@ -39,6 +39,7 @@ def config(self) -> None: helm_diff_config=HelmDiffConfig(), pipeline_base_dir=PIPELINE_BASE_DIR, ) + set_config(config) @pytest.fixture() def handlers(self) -> ComponentHandlers: