Skip to content

Commit

Permalink
Explore naive approach for global KpopsConfig
Browse files Browse the repository at this point in the history
todo: singleton
  • Loading branch information
disrupted committed Jul 9, 2024
1 parent 0c26629 commit c8471e4
Show file tree
Hide file tree
Showing 20 changed files with 59 additions and 115 deletions.
30 changes: 10 additions & 20 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pydantic.json_schema import SkipJsonSchema

from kpops.component_handlers import ComponentHandlers
from kpops.config import KpopsConfig
from kpops.config import KpopsConfig, get_config
from kpops.const.file_type import KpopsFileType
from kpops.utils import cached_classproperty
from kpops.utils.dict_ops import (
Expand Down Expand Up @@ -50,7 +50,6 @@ class BaseDefaultsComponent(DescConfigModel, ABC):
correctly to the component.
:param enrich: Whether to enrich component with defaults, defaults to False
:param config_: KPOps configuration to be accessed by this component
:param handlers_: Component handlers to be accessed by this component
:param validate: Whether to run custom validation on the component, defaults to True
"""
Expand All @@ -64,11 +63,6 @@ class BaseDefaultsComponent(DescConfigModel, ABC):
description=describe_attr("enrich", __doc__),
exclude=True,
)
config_: SkipJsonSchema[KpopsConfig] = Field(
default=...,
description=describe_attr("config_", __doc__),
exclude=True,
)
handlers_: SkipJsonSchema[ComponentHandlers] = Field(
default=...,
description=describe_attr("handlers_", __doc__),
Expand All @@ -87,11 +81,10 @@ def __init__(self, **values: Any) -> None:
values = cls.extend_with_defaults(**values)
tmp_self = cls(**values, enrich=False)
values = tmp_self.model_dump(mode="json", by_alias=True)
values = cls.substitute_in_component(tmp_self.config_, **values)
values = cls.substitute_in_component(**values)
self.__init__(
enrich=False,
validate=True,
config_=tmp_self.config_,
handlers_=tmp_self.handlers_,
**values,
)
Expand Down Expand Up @@ -130,20 +123,19 @@ def gen_parents():
return tuple(gen_parents())

@classmethod
def substitute_in_component(
cls, config_: KpopsConfig, **component_data: Any
) -> dict[str, Any]:
def substitute_in_component(cls, **component_data: Any) -> dict[str, Any]:
"""Substitute all $-placeholders in a component in dict representation.
:param component_as_dict: Component represented as dict
:return: Updated component
"""
config = get_config()
# Leftover variables that were previously introduced in the component by the substitution
# functions, still hardcoded, because of their names.
# TODO(Ivan Yordanov): Get rid of them
substitution_hardcoded: dict[str, JsonType] = {
"error_topic_name": config_.topic_name_config.default_error_topic_name,
"output_topic_name": config_.topic_name_config.default_output_topic_name,
"error_topic_name": config.topic_name_config.default_error_topic_name,
"output_topic_name": config.topic_name_config.default_output_topic_name,
}
component_substitution = generate_substitution(
component_data,
Expand All @@ -152,7 +144,7 @@ def substitute_in_component(
separator=".",
)
substitution = generate_substitution(
config_.model_dump(mode="json"),
config.model_dump(mode="json"),
"config",
existing_substitution=component_substitution,
separator=".",
Expand All @@ -166,16 +158,14 @@ def substitute_in_component(
)

@classmethod
def extend_with_defaults(
cls, config_: KpopsConfig, **kwargs: Any
) -> dict[str, Any]:
def extend_with_defaults(cls, **kwargs: Any) -> dict[str, Any]:
"""Merge parent components' defaults with own.
:param config: KPOps configuration
:param kwargs: The init kwargs for pydantic
:returns: Enriched kwargs with inherited defaults
"""
kwargs["config_"] = config_
config = get_config()
pipeline_path_str = ENV.get(PIPELINE_PATH)
if not pipeline_path_str:
return kwargs
Expand All @@ -187,7 +177,7 @@ def extend_with_defaults(
kwargs[k] = asdict(v)

defaults_file_paths_ = get_defaults_file_paths(
pipeline_path, config_, ENV.get("environment")
pipeline_path, config, ENV.get("environment")
)
defaults = cls.load_defaults(*defaults_file_paths_)
log.debug(
Expand Down
3 changes: 2 additions & 1 deletion kpops/components/base_components/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
create_helm_release_name,
)
from kpops.components.base_components.helm_app import HelmApp
from kpops.config import get_config


class Cleaner(HelmApp, ABC):
Expand All @@ -34,7 +35,7 @@ def helm_name_override(self) -> str:
@override
def helm_flags(self) -> HelmFlags:
return HelmFlags(
create_namespace=self.config_.create_namespace,
create_namespace=get_config().create_namespace,
version=self.version,
wait=True,
wait_for_jobs=True,
Expand Down
11 changes: 6 additions & 5 deletions kpops/components/base_components/helm_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
KubernetesAppValues,
)
from kpops.components.base_components.models.resource import Resource
from kpops.config import get_config
from kpops.utils.colorify import magentaify
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import exclude_by_name
Expand Down Expand Up @@ -84,7 +85,7 @@ class HelmApp(KubernetesApp):
@cached_property
def helm(self) -> Helm:
"""Helm object that contains component-specific config such as repo."""
helm = Helm(self.config_.helm_config)
helm = Helm(get_config().helm_config)
if self.repo_config is not None:
helm.add_repo(
self.repo_config.repository_name,
Expand All @@ -96,11 +97,11 @@ def helm(self) -> Helm:
@cached_property
def helm_diff(self) -> HelmDiff:
"""Helm diff object of last and current release of this component."""
return HelmDiff(self.config_.helm_diff_config)
return HelmDiff(get_config().helm_diff_config)

@cached_property
def dry_run_handler(self) -> DryRunHandler:
helm_diff = HelmDiff(self.config_.helm_diff_config)
helm_diff = HelmDiff(get_config().helm_diff_config)
return DryRunHandler(self.helm, helm_diff, self.namespace)

@property
Expand Down Expand Up @@ -130,15 +131,15 @@ def helm_flags(self) -> HelmFlags:
return HelmFlags(
**auth_flags,
version=self.version,
create_namespace=self.config_.create_namespace,
create_namespace=get_config().create_namespace,
)

@property
def template_flags(self) -> HelmTemplateFlags:
"""Return flags for Helm template command."""
return HelmTemplateFlags(
**self.helm_flags.model_dump(),
api_version=self.config_.helm_config.api_version,
api_version=get_config().helm_config.api_version,
)

@override
Expand Down
3 changes: 2 additions & 1 deletion kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.components.common.streams_bootstrap import StreamsBootstrap
from kpops.config import get_config
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import (
CamelCaseConfigModel,
Expand Down Expand Up @@ -113,7 +114,7 @@ async def clean(self, dry_run: bool) -> None:
log.info(f"Init cleanup job for {self.helm_release_name}")
await self.deploy(dry_run)

if not self.config_.retain_clean_jobs:
if not get_config().retain_clean_jobs:
log.info(f"Uninstall cleanup job for {self.helm_release_name}")
await self.destroy(dry_run)

Expand Down
6 changes: 3 additions & 3 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from kpops.components.base_components.models.from_section import FromTopic
from kpops.components.base_components.models.topic import KafkaTopic
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.config import get_config
from kpops.utils.colorify import magentaify
from kpops.utils.docstring import describe_attr

Expand Down Expand Up @@ -84,7 +85,7 @@ async def reset(self, dry_run: bool) -> None:
)
await self.deploy(dry_run)

if not self.config_.retain_clean_jobs:
if not get_config().retain_clean_jobs:
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
await self.destroy(dry_run)

Expand Down Expand Up @@ -140,7 +141,6 @@ def _resetter(self) -> KafkaConnectorResetter:
if self.resetter_namespace:
kwargs["namespace"] = self.resetter_namespace
return KafkaConnectorResetter(
config_=self.config_,
handlers_=self.handlers_,
**kwargs,
**self.model_dump(
Expand All @@ -158,7 +158,7 @@ def _resetter(self) -> KafkaConnectorResetter:
connector_type=self._connector_type.value,
config=KafkaConnectorResetterConfig(
connector=self.full_name,
brokers=self.config_.kafka_brokers,
brokers=get_config().kafka_brokers,
),
**self.resetter_values.model_dump(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class ProducerApp(KafkaApp, StreamsBootstrap):
@cached_property
def _cleaner(self) -> ProducerAppCleaner:
return ProducerAppCleaner(
config_=self.config_,
handlers_=self.handlers_,
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}),
)
Expand Down
1 change: 0 additions & 1 deletion kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class StreamsApp(KafkaApp, StreamsBootstrap):
@cached_property
def _cleaner(self) -> StreamsAppCleaner:
return StreamsAppCleaner(
config_=self.config_,
handlers_=self.handlers_,
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}),
)
Expand Down
19 changes: 17 additions & 2 deletions kpops/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import logging
from pathlib import Path
from typing import ClassVar

from pydantic import AnyHttpUrl, Field, TypeAdapter
from pydantic import AnyHttpUrl, Field, PrivateAttr, TypeAdapter
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
Expand Down Expand Up @@ -74,6 +75,8 @@ class KafkaConnectConfig(BaseSettings):
class KpopsConfig(BaseSettings):
"""Global configuration for KPOps project."""

_config: ClassVar[KpopsConfig] = PrivateAttr()

pipeline_base_dir: Path = Field(
default=Path(),
description="Base directory to the pipelines (default is current working directory)",
Expand Down Expand Up @@ -131,9 +134,10 @@ def create(
cls.setup_logging_level(verbose)
YamlConfigSettingsSource.config_dir = config
YamlConfigSettingsSource.environment = environment
return KpopsConfig(
cls._config = KpopsConfig(
_env_file=dotenv # pyright: ignore[reportCallIssue]
)
return cls._config

@staticmethod
def setup_logging_level(verbose: bool):
Expand Down Expand Up @@ -162,3 +166,14 @@ def settings_customise_sources(
dotenv_settings,
file_secret_settings,
)


def get_config() -> KpopsConfig:
if KpopsConfig._config is None:
msg = f"{KpopsConfig.__name__} has not been initialized, call {KpopsConfig.__name__}.{KpopsConfig.create.__name__}"
raise RuntimeError(msg)
return KpopsConfig._config


def set_config(config: KpopsConfig) -> None:
KpopsConfig._config = config
2 changes: 0 additions & 2 deletions kpops/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ def apply_component(
:param component_data: Arguments for instantiation of pipeline component
"""
component = component_class(
config_=self.config,
handlers_=self.handlers,
**component_data,
)
Expand Down Expand Up @@ -350,7 +349,6 @@ def enrich_component_with_env(
)

return component.__class__(
config_=self.config,
handlers_=self.handlers,
**env_component_as_dict,
)
Expand Down
34 changes: 14 additions & 20 deletions tests/components/test_base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
BaseDefaultsComponent,
get_defaults_file_paths,
)
from kpops.config import KpopsConfig
from kpops.config import KpopsConfig, set_config
from kpops.const.file_type import DEFAULTS_YAML, PIPELINE_YAML, KpopsFileType
from kpops.pipeline import PIPELINE_PATH
from kpops.utils.environment import ENV
Expand Down Expand Up @@ -46,10 +46,11 @@ class EnvVarTest(BaseDefaultsComponent):
name: str | None = None


@pytest.fixture()
def config() -> KpopsConfig:
@pytest.fixture(autouse=True)
def config() -> None:
ENV[PIPELINE_PATH] = str(RESOURCES_PATH / "pipeline.yaml")
return KpopsConfig(pipeline_base_dir=PIPELINE_BASE_DIR)
config = KpopsConfig(pipeline_base_dir=PIPELINE_BASE_DIR)
set_config(config)


@pytest.fixture()
Expand Down Expand Up @@ -123,9 +124,9 @@ def test_load_defaults_with_environment(
== defaults
)

def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers):
def test_inherit_defaults(self, handlers: ComponentHandlers):
ENV["environment"] = "development"
component = Child(config_=config, handlers_=handlers)
component = Child(handlers_=handlers)

assert (
component.name == "fake-child-name"
Expand All @@ -143,9 +144,8 @@ def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers
component.hard_coded == "hard_coded_value"
), "Defaults in code should be kept for parents"

def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers):
def test_inherit(self, handlers: ComponentHandlers):
component = Child(
config_=config,
handlers_=handlers,
name="name-defined-in-pipeline_parser",
)
Expand All @@ -166,10 +166,8 @@ def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers):
component.hard_coded == "hard_coded_value"
), "Defaults in code should be kept for parents"

def test_multiple_generations(
self, config: KpopsConfig, handlers: ComponentHandlers
):
component = GrandChild(config_=config, handlers_=handlers)
def test_multiple_generations(self, handlers: ComponentHandlers):
component = GrandChild(handlers_=handlers)

assert (
component.name == "fake-child-name"
Expand All @@ -188,22 +186,18 @@ def test_multiple_generations(
), "Defaults in code should be kept for parents"
assert component.grand_child == "grand-child-value"

def test_env_var_substitution(
self, config: KpopsConfig, handlers: ComponentHandlers
):
def test_env_var_substitution(self, handlers: ComponentHandlers):
ENV["pipeline_name"] = RESOURCES_PATH.as_posix()
component = EnvVarTest(config_=config, handlers_=handlers)
component = EnvVarTest(handlers_=handlers)

assert component.name

assert (
Path(component.name) == RESOURCES_PATH
), "Environment variables should be substituted"

def test_merge_defaults(self, config: KpopsConfig, handlers: ComponentHandlers):
component = GrandChild(
config_=config, handlers_=handlers, nested=Nested(**{"bar": False})
)
def test_merge_defaults(self, handlers: ComponentHandlers):
component = GrandChild(handlers_=handlers, nested=Nested(**{"bar": False}))
assert isinstance(component.nested, Nested)
assert component.nested == Nested(**{"foo": "foo", "bar": False})

Expand Down
Loading

0 comments on commit c8471e4

Please sign in to comment.