Skip to content

Commit

Permalink
Revert PrivateAttr
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted committed Jul 8, 2024
1 parent 8988667 commit 174775d
Show file tree
Hide file tree
Showing 20 changed files with 174 additions and 173 deletions.
46 changes: 25 additions & 21 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
AliasChoices,
ConfigDict,
Field,
PrivateAttr,
computed_field,
)
from pydantic.json_schema import SkipJsonSchema
Expand Down Expand Up @@ -51,8 +50,8 @@ class BaseDefaultsComponent(DescConfigModel, ABC):
correctly to the component.
:param enrich: Whether to enrich component with defaults, defaults to False
:param _config: KPOps configuration to be accessed by this component
:param _handlers: Component handlers to be accessed by this component
:param config_: KPOps configuration to be accessed by this component
:param handlers_: Component handlers to be accessed by this component
:param validate: Whether to run custom validation on the component, defaults to True
"""

Expand All @@ -65,35 +64,39 @@ class BaseDefaultsComponent(DescConfigModel, ABC):
description=describe_attr("enrich", __doc__),
exclude=True,
)
_config: KpopsConfig = PrivateAttr()
_handlers: ComponentHandlers = PrivateAttr()
config_: SkipJsonSchema[KpopsConfig] = Field(
default=...,
description=describe_attr("config_", __doc__),
exclude=True,
)
handlers_: SkipJsonSchema[ComponentHandlers] = Field(
default=...,
description=describe_attr("handlers_", __doc__),
exclude=True,
)
validate_: SkipJsonSchema[bool] = Field(
validation_alias=AliasChoices("validate", "validate_"),
default=False,
description=describe_attr("validate", __doc__),
exclude=True,
)

def __init__(
self, *, _config: KpopsConfig, _handlers: ComponentHandlers, **values: Any
) -> None:
def __init__(self, **values: Any) -> None:
if values.get("enrich", True):
cls = self.__class__
values = cls.extend_with_defaults(_config, **values)
tmp_self = cls(_config=_config, _handlers=_handlers, **values, enrich=False)
values = cls.extend_with_defaults(**values)
tmp_self = cls(**values, enrich=False)
values = tmp_self.model_dump(mode="json", by_alias=True)
values = cls.substitute_in_component(_config, **values)
values = cls.substitute_in_component(tmp_self.config_, **values)
self.__init__(
enrich=False,
validate=True,
_config=_config,
_handlers=_handlers,
config_=tmp_self.config_,
handlers_=tmp_self.handlers_,
**values,
)
else:
super().__init__(**values)
self._config = _config
self._handlers = _handlers

@pydantic.model_validator(mode="after")
def validate_component(self) -> Self:
Expand Down Expand Up @@ -128,7 +131,7 @@ def gen_parents():

@classmethod
def substitute_in_component(
cls, _config: KpopsConfig, **component_data: Any
cls, config_: KpopsConfig, **component_data: Any
) -> dict[str, Any]:
"""Substitute all $-placeholders in a component in dict representation.
Expand All @@ -139,8 +142,8 @@ def substitute_in_component(
# functions, still hardcoded, because of their names.
# TODO(Ivan Yordanov): Get rid of them
substitution_hardcoded: dict[str, JsonType] = {
"error_topic_name": _config.topic_name_config.default_error_topic_name,
"output_topic_name": _config.topic_name_config.default_output_topic_name,
"error_topic_name": config_.topic_name_config.default_error_topic_name,
"output_topic_name": config_.topic_name_config.default_output_topic_name,
}
component_substitution = generate_substitution(
component_data,
Expand All @@ -149,7 +152,7 @@ def substitute_in_component(
separator=".",
)
substitution = generate_substitution(
_config.model_dump(mode="json"),
config_.model_dump(mode="json"),
"config",
existing_substitution=component_substitution,
separator=".",
Expand All @@ -164,14 +167,15 @@ def substitute_in_component(

@classmethod
def extend_with_defaults(
cls, _config: KpopsConfig, **kwargs: Any
cls, config_: KpopsConfig, **kwargs: Any
) -> dict[str, Any]:
"""Merge parent components' defaults with own.
:param config: KPOps configuration
:param kwargs: The init kwargs for pydantic
:returns: Enriched kwargs with inherited defaults
"""
kwargs["config_"] = config_
pipeline_path_str = ENV.get(PIPELINE_PATH)
if not pipeline_path_str:
return kwargs
Expand All @@ -183,7 +187,7 @@ def extend_with_defaults(
kwargs[k] = asdict(v)

defaults_file_paths_ = get_defaults_file_paths(
pipeline_path, _config, ENV.get("environment")
pipeline_path, config_, ENV.get("environment")
)
defaults = cls.load_defaults(*defaults_file_paths_)
log.debug(
Expand Down
2 changes: 1 addition & 1 deletion kpops/components/base_components/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def helm_name_override(self) -> str:
@override
def helm_flags(self) -> HelmFlags:
return HelmFlags(
create_namespace=self._config.create_namespace,
create_namespace=self.config_.create_namespace,
version=self.version,
wait=True,
wait_for_jobs=True,
Expand Down
10 changes: 5 additions & 5 deletions kpops/components/base_components/helm_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class HelmApp(KubernetesApp):
@cached_property
def helm(self) -> Helm:
"""Helm object that contains component-specific config such as repo."""
helm = Helm(self._config.helm_config)
helm = Helm(self.config_.helm_config)
if self.repo_config is not None:
helm.add_repo(
self.repo_config.repository_name,
Expand All @@ -96,11 +96,11 @@ def helm(self) -> Helm:
@cached_property
def helm_diff(self) -> HelmDiff:
"""Helm diff object of last and current release of this component."""
return HelmDiff(self._config.helm_diff_config)
return HelmDiff(self.config_.helm_diff_config)

@cached_property
def dry_run_handler(self) -> DryRunHandler:
helm_diff = HelmDiff(self._config.helm_diff_config)
helm_diff = HelmDiff(self.config_.helm_diff_config)
return DryRunHandler(self.helm, helm_diff, self.namespace)

@property
Expand Down Expand Up @@ -130,15 +130,15 @@ def helm_flags(self) -> HelmFlags:
return HelmFlags(
**auth_flags,
version=self.version,
create_namespace=self._config.create_namespace,
create_namespace=self.config_.create_namespace,
)

@property
def template_flags(self) -> HelmTemplateFlags:
"""Return flags for Helm template command."""
return HelmTemplateFlags(
**self.helm_flags.model_dump(),
api_version=self._config.helm_config.api_version,
api_version=self.config_.helm_config.api_version,
)

@override
Expand Down
8 changes: 4 additions & 4 deletions kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def clean(self, dry_run: bool) -> None:
log.info(f"Init cleanup job for {self.helm_release_name}")
await self.deploy(dry_run)

if not self._config.retain_clean_jobs:
if not self.config_.retain_clean_jobs:
log.info(f"Uninstall cleanup job for {self.helm_release_name}")
await self.destroy(dry_run)

Expand All @@ -135,10 +135,10 @@ class KafkaApp(PipelineComponent, ABC):
async def deploy(self, dry_run: bool) -> None:
if self.to:
for topic in self.to.kafka_topics:
await self._handlers.topic_handler.create_topic(topic, dry_run=dry_run)
await self.handlers_.topic_handler.create_topic(topic, dry_run=dry_run)

if self._handlers.schema_handler:
await self._handlers.schema_handler.submit_schemas(
if self.handlers_.schema_handler:
await self.handlers_.schema_handler.submit_schemas(
to_section=self.to, dry_run=dry_run
)

Expand Down
24 changes: 12 additions & 12 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def reset(self, dry_run: bool) -> None:
)
await self.deploy(dry_run)

if not self._config.retain_clean_jobs:
if not self.config_.retain_clean_jobs:
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
await self.destroy(dry_run)

Expand Down Expand Up @@ -140,8 +140,8 @@ def _resetter(self) -> KafkaConnectorResetter:
if self.resetter_namespace:
kwargs["namespace"] = self.resetter_namespace
return KafkaConnectorResetter(
_config=self._config,
_handlers=self._handlers,
config_=self.config_,
handlers_=self.handlers_,
**kwargs,
**self.model_dump(
by_alias=True,
Expand All @@ -158,7 +158,7 @@ def _resetter(self) -> KafkaConnectorResetter:
connector_type=self._connector_type.value,
config=KafkaConnectorResetterConfig(
connector=self.full_name,
brokers=self._config.kafka_brokers,
brokers=self.config_.kafka_brokers,
),
**self.resetter_values.model_dump(),
),
Expand All @@ -168,32 +168,32 @@ def _resetter(self) -> KafkaConnectorResetter:
async def deploy(self, dry_run: bool) -> None:
if self.to:
for topic in self.to.kafka_topics:
await self._handlers.topic_handler.create_topic(topic, dry_run=dry_run)
await self.handlers_.topic_handler.create_topic(topic, dry_run=dry_run)

if self._handlers.schema_handler:
await self._handlers.schema_handler.submit_schemas(
if self.handlers_.schema_handler:
await self.handlers_.schema_handler.submit_schemas(
to_section=self.to, dry_run=dry_run
)

await self._handlers.connector_handler.create_connector(
await self.handlers_.connector_handler.create_connector(
self.config, dry_run=dry_run
)

@override
async def destroy(self, dry_run: bool) -> None:
await self._handlers.connector_handler.destroy_connector(
await self.handlers_.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
async def clean(self, dry_run: bool) -> None:
if self.to:
if self._handlers.schema_handler:
await self._handlers.schema_handler.delete_schemas(
if self.handlers_.schema_handler:
await self.handlers_.schema_handler.delete_schemas(
to_section=self.to, dry_run=dry_run
)
for topic in self.to.kafka_topics:
await self._handlers.topic_handler.delete_topic(topic, dry_run=dry_run)
await self.handlers_.topic_handler.delete_topic(topic, dry_run=dry_run)


class KafkaSourceConnector(KafkaConnector):
Expand Down
4 changes: 2 additions & 2 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class ProducerApp(KafkaApp, StreamsBootstrap):
@cached_property
def _cleaner(self) -> ProducerAppCleaner:
return ProducerAppCleaner(
_config=self._config,
_handlers=self._handlers,
config_=self.config_,
handlers_=self.handlers_,
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}),
)

Expand Down
4 changes: 2 additions & 2 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class StreamsApp(KafkaApp, StreamsBootstrap):
@cached_property
def _cleaner(self) -> StreamsAppCleaner:
return StreamsAppCleaner(
_config=self._config,
_handlers=self._handlers,
config_=self.config_,
handlers_=self.handlers_,
**self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}),
)

Expand Down
8 changes: 4 additions & 4 deletions kpops/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ def apply_component(
:param component_data: Arguments for instantiation of pipeline component
"""
component = component_class(
_config=self.config,
_handlers=self.handlers,
config_=self.config,
handlers_=self.handlers,
**component_data,
)
component = self.enrich_component_with_env(component)
Expand Down Expand Up @@ -350,8 +350,8 @@ def enrich_component_with_env(
)

return component.__class__(
_config=self.config,
_handlers=self.handlers,
config_=self.config,
handlers_=self.handlers,
**env_component_as_dict,
)

Expand Down
12 changes: 6 additions & 6 deletions tests/components/test_base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_load_defaults_with_environment(

def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers):
ENV["environment"] = "development"
component = Child(_config=config, _handlers=handlers)
component = Child(config_=config, handlers_=handlers)

assert (
component.name == "fake-child-name"
Expand All @@ -145,8 +145,8 @@ def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers

def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers):
component = Child(
_config=config,
_handlers=handlers,
config_=config,
handlers_=handlers,
name="name-defined-in-pipeline_parser",
)

Expand All @@ -169,7 +169,7 @@ def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers):
def test_multiple_generations(
self, config: KpopsConfig, handlers: ComponentHandlers
):
component = GrandChild(_config=config, _handlers=handlers)
component = GrandChild(config_=config, handlers_=handlers)

assert (
component.name == "fake-child-name"
Expand All @@ -192,7 +192,7 @@ def test_env_var_substitution(
self, config: KpopsConfig, handlers: ComponentHandlers
):
ENV["pipeline_name"] = RESOURCES_PATH.as_posix()
component = EnvVarTest(_config=config, _handlers=handlers)
component = EnvVarTest(config_=config, handlers_=handlers)

assert component.name

Expand All @@ -202,7 +202,7 @@ def test_env_var_substitution(

def test_merge_defaults(self, config: KpopsConfig, handlers: ComponentHandlers):
component = GrandChild(
_config=config, _handlers=handlers, nested=Nested(**{"bar": False})
config_=config, handlers_=handlers, nested=Nested(**{"bar": False})
)
assert isinstance(component.nested, Nested)
assert component.nested == Nested(**{"foo": "foo", "bar": False})
Expand Down
Loading

0 comments on commit 174775d

Please sign in to comment.