From 174775d927835579835022a06bebf9015c823e53 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 8 Jul 2024 21:03:10 +0200 Subject: [PATCH] Revert PrivateAttr --- .../base_defaults_component.py | 46 ++++++++++-------- kpops/components/base_components/cleaner.py | 2 +- kpops/components/base_components/helm_app.py | 10 ++-- kpops/components/base_components/kafka_app.py | 8 ++-- .../base_components/kafka_connector.py | 24 +++++----- .../producer/producer_app.py | 4 +- .../streams_bootstrap/streams/streams_app.py | 4 +- kpops/pipeline/__init__.py | 8 ++-- .../test_base_defaults_component.py | 12 ++--- tests/components/test_helm_app.py | 16 +++---- tests/components/test_kafka_connector.py | 16 +++---- tests/components/test_kafka_sink_connector.py | 42 ++++++++-------- .../components/test_kafka_source_connector.py | 48 +++++++++---------- tests/components/test_kubernetes_app.py | 16 +++---- tests/components/test_producer_app.py | 14 +++--- tests/components/test_streams_app.py | 42 ++++++++-------- tests/components/test_streams_bootstrap.py | 8 ++-- tests/pipeline/test_components/components.py | 12 ++--- .../components.py | 4 +- tests/pipeline/test_pipeline.py | 11 ++--- 20 files changed, 174 insertions(+), 173 deletions(-) diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index 41225552b..b015c134c 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -15,7 +15,6 @@ AliasChoices, ConfigDict, Field, - PrivateAttr, computed_field, ) from pydantic.json_schema import SkipJsonSchema @@ -51,8 +50,8 @@ class BaseDefaultsComponent(DescConfigModel, ABC): correctly to the component. :param enrich: Whether to enrich component with defaults, defaults to False - :param _config: KPOps configuration to be accessed by this component - :param _handlers: Component handlers to be accessed by this component + :param config_: KPOps configuration to be accessed by this component + :param handlers_: Component handlers to be accessed by this component :param validate: Whether to run custom validation on the component, defaults to True """ @@ -65,8 +64,16 @@ class BaseDefaultsComponent(DescConfigModel, ABC): description=describe_attr("enrich", __doc__), exclude=True, ) - _config: KpopsConfig = PrivateAttr() - _handlers: ComponentHandlers = PrivateAttr() + config_: SkipJsonSchema[KpopsConfig] = Field( + default=..., + description=describe_attr("config_", __doc__), + exclude=True, + ) + handlers_: SkipJsonSchema[ComponentHandlers] = Field( + default=..., + description=describe_attr("handlers_", __doc__), + exclude=True, + ) validate_: SkipJsonSchema[bool] = Field( validation_alias=AliasChoices("validate", "validate_"), default=False, @@ -74,26 +81,22 @@ class BaseDefaultsComponent(DescConfigModel, ABC): exclude=True, ) - def __init__( - self, *, _config: KpopsConfig, _handlers: ComponentHandlers, **values: Any - ) -> None: + def __init__(self, **values: Any) -> None: if values.get("enrich", True): cls = self.__class__ - values = cls.extend_with_defaults(_config, **values) - tmp_self = cls(_config=_config, _handlers=_handlers, **values, enrich=False) + values = cls.extend_with_defaults(**values) + tmp_self = cls(**values, enrich=False) values = tmp_self.model_dump(mode="json", by_alias=True) - values = cls.substitute_in_component(_config, **values) + values = cls.substitute_in_component(tmp_self.config_, **values) self.__init__( enrich=False, validate=True, - _config=_config, - _handlers=_handlers, + config_=tmp_self.config_, + handlers_=tmp_self.handlers_, **values, ) else: super().__init__(**values) - self._config = _config - self._handlers = _handlers @pydantic.model_validator(mode="after") def validate_component(self) -> Self: @@ -128,7 +131,7 @@ def gen_parents(): @classmethod def substitute_in_component( - cls, _config: KpopsConfig, **component_data: Any + cls, config_: KpopsConfig, **component_data: Any ) -> dict[str, Any]: """Substitute all $-placeholders in a component in dict representation. @@ -139,8 +142,8 @@ def substitute_in_component( # functions, still hardcoded, because of their names. # TODO(Ivan Yordanov): Get rid of them substitution_hardcoded: dict[str, JsonType] = { - "error_topic_name": _config.topic_name_config.default_error_topic_name, - "output_topic_name": _config.topic_name_config.default_output_topic_name, + "error_topic_name": config_.topic_name_config.default_error_topic_name, + "output_topic_name": config_.topic_name_config.default_output_topic_name, } component_substitution = generate_substitution( component_data, @@ -149,7 +152,7 @@ def substitute_in_component( separator=".", ) substitution = generate_substitution( - _config.model_dump(mode="json"), + config_.model_dump(mode="json"), "config", existing_substitution=component_substitution, separator=".", @@ -164,7 +167,7 @@ def substitute_in_component( @classmethod def extend_with_defaults( - cls, _config: KpopsConfig, **kwargs: Any + cls, config_: KpopsConfig, **kwargs: Any ) -> dict[str, Any]: """Merge parent components' defaults with own. @@ -172,6 +175,7 @@ def extend_with_defaults( :param kwargs: The init kwargs for pydantic :returns: Enriched kwargs with inherited defaults """ + kwargs["config_"] = config_ pipeline_path_str = ENV.get(PIPELINE_PATH) if not pipeline_path_str: return kwargs @@ -183,7 +187,7 @@ def extend_with_defaults( kwargs[k] = asdict(v) defaults_file_paths_ = get_defaults_file_paths( - pipeline_path, _config, ENV.get("environment") + pipeline_path, config_, ENV.get("environment") ) defaults = cls.load_defaults(*defaults_file_paths_) log.debug( diff --git a/kpops/components/base_components/cleaner.py b/kpops/components/base_components/cleaner.py index b019cc0c4..c1a7da003 100644 --- a/kpops/components/base_components/cleaner.py +++ b/kpops/components/base_components/cleaner.py @@ -34,7 +34,7 @@ def helm_name_override(self) -> str: @override def helm_flags(self) -> HelmFlags: return HelmFlags( - create_namespace=self._config.create_namespace, + create_namespace=self.config_.create_namespace, version=self.version, wait=True, wait_for_jobs=True, diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 40d5f4ee1..b6b98b554 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -84,7 +84,7 @@ class HelmApp(KubernetesApp): @cached_property def helm(self) -> Helm: """Helm object that contains component-specific config such as repo.""" - helm = Helm(self._config.helm_config) + helm = Helm(self.config_.helm_config) if self.repo_config is not None: helm.add_repo( self.repo_config.repository_name, @@ -96,11 +96,11 @@ def helm(self) -> Helm: @cached_property def helm_diff(self) -> HelmDiff: """Helm diff object of last and current release of this component.""" - return HelmDiff(self._config.helm_diff_config) + return HelmDiff(self.config_.helm_diff_config) @cached_property def dry_run_handler(self) -> DryRunHandler: - helm_diff = HelmDiff(self._config.helm_diff_config) + helm_diff = HelmDiff(self.config_.helm_diff_config) return DryRunHandler(self.helm, helm_diff, self.namespace) @property @@ -130,7 +130,7 @@ def helm_flags(self) -> HelmFlags: return HelmFlags( **auth_flags, version=self.version, - create_namespace=self._config.create_namespace, + create_namespace=self.config_.create_namespace, ) @property @@ -138,7 +138,7 @@ def template_flags(self) -> HelmTemplateFlags: """Return flags for Helm template command.""" return HelmTemplateFlags( **self.helm_flags.model_dump(), - api_version=self._config.helm_config.api_version, + api_version=self.config_.helm_config.api_version, ) @override diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 61de548b6..2c24f2525 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -113,7 +113,7 @@ async def clean(self, dry_run: bool) -> None: log.info(f"Init cleanup job for {self.helm_release_name}") await self.deploy(dry_run) - if not self._config.retain_clean_jobs: + if not self.config_.retain_clean_jobs: log.info(f"Uninstall cleanup job for {self.helm_release_name}") await self.destroy(dry_run) @@ -135,10 +135,10 @@ class KafkaApp(PipelineComponent, ABC): async def deploy(self, dry_run: bool) -> None: if self.to: for topic in self.to.kafka_topics: - await self._handlers.topic_handler.create_topic(topic, dry_run=dry_run) + await self.handlers_.topic_handler.create_topic(topic, dry_run=dry_run) - if self._handlers.schema_handler: - await self._handlers.schema_handler.submit_schemas( + if self.handlers_.schema_handler: + await self.handlers_.schema_handler.submit_schemas( to_section=self.to, dry_run=dry_run ) diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index b7b020de4..3e3884f85 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -84,7 +84,7 @@ async def reset(self, dry_run: bool) -> None: ) await self.deploy(dry_run) - if not self._config.retain_clean_jobs: + if not self.config_.retain_clean_jobs: log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) await self.destroy(dry_run) @@ -140,8 +140,8 @@ def _resetter(self) -> KafkaConnectorResetter: if self.resetter_namespace: kwargs["namespace"] = self.resetter_namespace return KafkaConnectorResetter( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, **kwargs, **self.model_dump( by_alias=True, @@ -158,7 +158,7 @@ def _resetter(self) -> KafkaConnectorResetter: connector_type=self._connector_type.value, config=KafkaConnectorResetterConfig( connector=self.full_name, - brokers=self._config.kafka_brokers, + brokers=self.config_.kafka_brokers, ), **self.resetter_values.model_dump(), ), @@ -168,32 +168,32 @@ def _resetter(self) -> KafkaConnectorResetter: async def deploy(self, dry_run: bool) -> None: if self.to: for topic in self.to.kafka_topics: - await self._handlers.topic_handler.create_topic(topic, dry_run=dry_run) + await self.handlers_.topic_handler.create_topic(topic, dry_run=dry_run) - if self._handlers.schema_handler: - await self._handlers.schema_handler.submit_schemas( + if self.handlers_.schema_handler: + await self.handlers_.schema_handler.submit_schemas( to_section=self.to, dry_run=dry_run ) - await self._handlers.connector_handler.create_connector( + await self.handlers_.connector_handler.create_connector( self.config, dry_run=dry_run ) @override async def destroy(self, dry_run: bool) -> None: - await self._handlers.connector_handler.destroy_connector( + await self.handlers_.connector_handler.destroy_connector( self.full_name, dry_run=dry_run ) @override async def clean(self, dry_run: bool) -> None: if self.to: - if self._handlers.schema_handler: - await self._handlers.schema_handler.delete_schemas( + if self.handlers_.schema_handler: + await self.handlers_.schema_handler.delete_schemas( to_section=self.to, dry_run=dry_run ) for topic in self.to.kafka_topics: - await self._handlers.topic_handler.delete_topic(topic, dry_run=dry_run) + await self.handlers_.topic_handler.delete_topic(topic, dry_run=dry_run) class KafkaSourceConnector(KafkaConnector): diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 27b45892a..4ff6ca562 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -56,8 +56,8 @@ class ProducerApp(KafkaApp, StreamsBootstrap): @cached_property def _cleaner(self) -> ProducerAppCleaner: return ProducerAppCleaner( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, **self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}), ) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 798e7e439..eb62a0945 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -60,8 +60,8 @@ class StreamsApp(KafkaApp, StreamsBootstrap): @cached_property def _cleaner(self) -> StreamsAppCleaner: return StreamsAppCleaner( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, **self.model_dump(by_alias=True, exclude={"_cleaner", "from_", "to"}), ) diff --git a/kpops/pipeline/__init__.py b/kpops/pipeline/__init__.py index e40b2ecc9..e09e78af6 100644 --- a/kpops/pipeline/__init__.py +++ b/kpops/pipeline/__init__.py @@ -306,8 +306,8 @@ def apply_component( :param component_data: Arguments for instantiation of pipeline component """ component = component_class( - _config=self.config, - _handlers=self.handlers, + config_=self.config, + handlers_=self.handlers, **component_data, ) component = self.enrich_component_with_env(component) @@ -350,8 +350,8 @@ def enrich_component_with_env( ) return component.__class__( - _config=self.config, - _handlers=self.handlers, + config_=self.config, + handlers_=self.handlers, **env_component_as_dict, ) diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 48601cbeb..b727beea6 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -125,7 +125,7 @@ def test_load_defaults_with_environment( def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers): ENV["environment"] = "development" - component = Child(_config=config, _handlers=handlers) + component = Child(config_=config, handlers_=handlers) assert ( component.name == "fake-child-name" @@ -145,8 +145,8 @@ def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers): component = Child( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="name-defined-in-pipeline_parser", ) @@ -169,7 +169,7 @@ def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers): def test_multiple_generations( self, config: KpopsConfig, handlers: ComponentHandlers ): - component = GrandChild(_config=config, _handlers=handlers) + component = GrandChild(config_=config, handlers_=handlers) assert ( component.name == "fake-child-name" @@ -192,7 +192,7 @@ def test_env_var_substitution( self, config: KpopsConfig, handlers: ComponentHandlers ): ENV["pipeline_name"] = RESOURCES_PATH.as_posix() - component = EnvVarTest(_config=config, _handlers=handlers) + component = EnvVarTest(config_=config, handlers_=handlers) assert component.name @@ -202,7 +202,7 @@ def test_env_var_substitution( def test_merge_defaults(self, config: KpopsConfig, handlers: ComponentHandlers): component = GrandChild( - _config=config, _handlers=handlers, nested=Nested(**{"bar": False}) + config_=config, handlers_=handlers, nested=Nested(**{"bar": False}) ) assert isinstance(component.nested, Nested) assert component.nested == Nested(**{"foo": "foo", "bar": False}) diff --git a/tests/components/test_helm_app.py b/tests/components/test_helm_app.py index da3d1a23b..da49ad65f 100644 --- a/tests/components/test_helm_app.py +++ b/tests/components/test_helm_app.py @@ -64,8 +64,8 @@ def helm_app( repo_config: HelmRepoConfig, ) -> HelmApp: return HelmApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="test-helm-app", values=app_values, namespace="test-namespace", @@ -115,8 +115,8 @@ async def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( repository_name="test-repo", url="https://test.com/charts/" ) helm_app = HelmApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="test-helm-app", values=app_values, namespace="test-namespace", @@ -169,8 +169,8 @@ def helm_chart(self) -> str: return "path/to/helm/charts/" app_with_local_chart = AppWithLocalChart( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="test-app-with-local-chart", values=app_values, namespace="test-namespace", @@ -231,8 +231,8 @@ def test_helm_name_override( repo_config: HelmRepoConfig, ): helm_app = HelmApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, prefix="test-pipeline-prefix-with-a-long-name-", name="helm-app-name-is-very-long-as-well", values=HelmAppValues(), diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index a5776b65a..572c57083 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -77,8 +77,8 @@ def connector( connector_config: KafkaConnectorConfig, ) -> KafkaConnector: return KafkaConnector( # HACK: not supposed to be instantiated, because ABC - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -93,8 +93,8 @@ def test_connector_config_name_override( assert connector.config.name == CONNECTOR_FULL_NAME connector = KafkaConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config={"connector.class": CONNECTOR_CLASS}, # type: ignore[reportGeneralTypeIssues], gets enriched resetter_namespace=RESETTER_NAMESPACE, @@ -108,8 +108,8 @@ def test_connector_config_name_override( ), ): KafkaConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config={"connector.class": CONNECTOR_CLASS, "name": "different-name"}, # type: ignore[reportGeneralTypeIssues], gets enriched ) @@ -121,8 +121,8 @@ def test_connector_config_name_override( ), ): KafkaConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config={"connector.class": CONNECTOR_CLASS, "name": ""}, # type: ignore[reportGeneralTypeIssues], gets enriched ) diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 798898caf..1a8705611 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -58,8 +58,8 @@ def connector( connector_config: KafkaConnectorConfig, ) -> KafkaSinkConnector: return KafkaSinkConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -111,8 +111,8 @@ def test_connector_config_parsing( ): topic_pattern = ".*" connector = KafkaSinkConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=KafkaConnectorConfig( **{**connector_config.model_dump(), "topics.regex": topic_pattern} @@ -131,8 +131,8 @@ def test_from_section_parsing_input_topic( topic1 = TopicName("connector-topic1") topic2 = TopicName("connector-topic2") connector = KafkaSinkConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -171,8 +171,8 @@ def test_from_section_parsing_input_pattern( ): topic_pattern = TopicName(".*") connector = KafkaSinkConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -189,10 +189,10 @@ async def test_deploy_order( mocker: MockerFixture, ): mock_create_topic = mocker.patch.object( - connector._handlers.topic_handler, "create_topic" + connector.handlers_.topic_handler, "create_topic" ) mock_create_connector = mocker.patch.object( - connector._handlers.connector_handler, "create_connector" + connector.handlers_.connector_handler, "create_connector" ) mock = mocker.AsyncMock() @@ -217,7 +217,7 @@ async def test_destroy( mocker: MockerFixture, ): mock_destroy_connector = mocker.patch.object( - connector._handlers.connector_handler, "destroy_connector" + connector.handlers_.connector_handler, "destroy_connector" ) await connector.destroy(dry_run=True) @@ -246,10 +246,10 @@ async def test_reset_when_dry_run_is_false( mocker: MockerFixture, ): mock_delete_topic = mocker.patch.object( - connector._handlers.topic_handler, "delete_topic" + connector.handlers_.topic_handler, "delete_topic" ) mock_clean_connector = mocker.patch.object( - connector._handlers.connector_handler, "clean_connector" + connector.handlers_.connector_handler, "clean_connector" ) mock_resetter_reset = mocker.spy(connector._resetter, "reset") @@ -330,10 +330,10 @@ async def test_clean_when_dry_run_is_false( mocker: MockerFixture, ): mock_delete_topic = mocker.patch.object( - connector._handlers.topic_handler, "delete_topic" + connector.handlers_.topic_handler, "delete_topic" ) mock_clean_connector = mocker.patch.object( - connector._handlers.connector_handler, "clean_connector" + connector.handlers_.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -415,8 +415,8 @@ async def test_clean_without_to_when_dry_run_is_true( connector_config: KafkaConnectorConfig, ): connector = KafkaSinkConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -438,18 +438,18 @@ async def test_clean_without_to_when_dry_run_is_false( connector_config: KafkaConnectorConfig, ): connector = KafkaSinkConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, ) mock_delete_topic = mocker.patch.object( - connector._handlers.topic_handler, "delete_topic" + connector.handlers_.topic_handler, "delete_topic" ) mock_clean_connector = mocker.patch.object( - connector._handlers.connector_handler, "clean_connector" + connector.handlers_.connector_handler, "clean_connector" ) mock = mocker.MagicMock() mock.attach_mock(mock_delete_topic, "mock_delete_topic") diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 309d4b71d..b7d78f3b2 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -51,8 +51,8 @@ def connector( connector_config: KafkaConnectorConfig, ) -> KafkaSourceConnector: return KafkaSourceConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -83,8 +83,8 @@ def test_from_section_raises_exception( ): with pytest.raises(NotImplementedError): KafkaSourceConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -104,11 +104,11 @@ async def test_deploy_order( mocker: MockerFixture, ): mock_create_topic = mocker.patch.object( - connector._handlers.topic_handler, "create_topic" + connector.handlers_.topic_handler, "create_topic" ) mock_create_connector = mocker.patch.object( - connector._handlers.connector_handler, "create_connector" + connector.handlers_.connector_handler, "create_connector" ) mock = mocker.AsyncMock() @@ -133,10 +133,10 @@ async def test_destroy( mocker: MockerFixture, ): ENV["KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC"] = OFFSETS_TOPIC - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler mock_destroy_connector = mocker.patch.object( - connector._handlers.connector_handler, "destroy_connector" + connector.handlers_.connector_handler, "destroy_connector" ) await connector.destroy(dry_run=True) @@ -151,7 +151,7 @@ async def test_reset_when_dry_run_is_true( connector: KafkaSourceConnector, dry_run_handler_mock: MagicMock, ): - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler await connector.reset(dry_run=True) @@ -165,12 +165,12 @@ async def test_reset_when_dry_run_is_false( helm_mock: MagicMock, mocker: MockerFixture, ): - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler mock_delete_topic = mocker.patch.object( - connector._handlers.topic_handler, "delete_topic" + connector.handlers_.topic_handler, "delete_topic" ) mock_clean_connector = mocker.spy( - connector._handlers.connector_handler, "clean_connector" + connector.handlers_.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -229,7 +229,7 @@ async def test_clean_when_dry_run_is_true( connector: KafkaSourceConnector, dry_run_handler_mock: MagicMock, ): - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler await connector.clean(dry_run=True) @@ -243,13 +243,13 @@ async def test_clean_when_dry_run_is_false( dry_run_handler_mock: MagicMock, mocker: MockerFixture, ): - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler mock_delete_topic = mocker.patch.object( - connector._handlers.topic_handler, "delete_topic" + connector.handlers_.topic_handler, "delete_topic" ) mock_clean_connector = mocker.spy( - connector._handlers.connector_handler, "clean_connector" + connector.handlers_.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -320,8 +320,8 @@ async def test_clean_without_to_when_dry_run_is_false( connector_config: KafkaConnectorConfig, ): connector = KafkaSourceConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -329,13 +329,13 @@ async def test_clean_without_to_when_dry_run_is_false( ) assert connector.to is None - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler mock_delete_topic = mocker.patch.object( - connector._handlers.topic_handler, "delete_topic" + connector.handlers_.topic_handler, "delete_topic" ) mock_clean_connector = mocker.spy( - connector._handlers.connector_handler, "clean_connector" + connector.handlers_.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -400,8 +400,8 @@ async def test_clean_without_to_when_dry_run_is_true( connector_config: KafkaConnectorConfig, ): connector = KafkaSourceConnector( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=CONNECTOR_NAME, config=connector_config, resetter_namespace=RESETTER_NAMESPACE, @@ -409,7 +409,7 @@ async def test_clean_without_to_when_dry_run_is_true( ) assert connector.to is None - assert connector._handlers.connector_handler + assert connector.handlers_.connector_handler await connector.clean(dry_run=True) diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index 859ba8270..15dfaffe8 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -58,8 +58,8 @@ def kubernetes_app( app_values: KubernetesTestValues, ) -> KubernetesApp: return KubernetesApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="test-kubernetes-app", values=app_values, namespace="test-namespace", @@ -75,8 +75,8 @@ def test_should_raise_value_error_when_name_is_not_valid( ValueError, match=r"The component name .* is invalid for Kubernetes." ): KubernetesApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="Not-Compatible*", values=app_values, namespace="test-namespace", @@ -86,16 +86,16 @@ def test_should_raise_value_error_when_name_is_not_valid( ValueError, match=r"The component name .* is invalid for Kubernetes." ): KubernetesApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="snake_case*", values=app_values, namespace="test-namespace", ) assert KubernetesApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="valid-name", values=app_values, namespace="test-namespace", diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 30ba47669..29fbc3cd8 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -62,8 +62,8 @@ def producer_app( self, config: KpopsConfig, handlers: ComponentHandlers ) -> ProducerApp: return ProducerApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=PRODUCER_APP_NAME, **{ "version": "2.4.2", @@ -104,8 +104,8 @@ def test_cleaner_helm_name_override(self, producer_app: ProducerApp): def test_output_topics(self, config: KpopsConfig, handlers: ComponentHandlers): producer_app = ProducerApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=PRODUCER_APP_NAME, **{ "namespace": "test-namespace", @@ -141,7 +141,7 @@ async def test_deploy_order_when_dry_run_is_false( mocker: MockerFixture, ): mock_create_topic = mocker.patch.object( - producer_app._handlers.topic_handler, "create_topic" + producer_app.handlers_.topic_handler, "create_topic" ) mock_helm_upgrade_install = mocker.patch.object( @@ -320,8 +320,8 @@ def test_get_output_topics( handlers: ComponentHandlers, ): producer_app = ProducerApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="my-producer", **{ "namespace": "test-namespace", diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 3664be544..3a9251f59 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -80,8 +80,8 @@ def streams_app( self, config: KpopsConfig, handlers: ComponentHandlers ) -> StreamsApp: return StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -103,8 +103,8 @@ def stateful_streams_app( self, config: KpopsConfig, handlers: ComponentHandlers ) -> StreamsApp: return StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -198,8 +198,8 @@ def test_cleaner_helm_name_override(self, streams_app: StreamsApp): def test_set_topics(self, config: KpopsConfig, handlers: ComponentHandlers): streams_app = StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -248,8 +248,8 @@ def test_no_empty_input_topic( self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -281,8 +281,8 @@ def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers) ValueError, match="Define role only if `type` is `pattern` or `None`" ): StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -305,8 +305,8 @@ def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers) ValueError, match="Define `role` only if `type` is undefined" ): StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -328,8 +328,8 @@ def test_set_streams_output_from_to( self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -371,8 +371,8 @@ def test_weave_inputs_from_prev_component( self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -415,8 +415,8 @@ async def test_deploy_order_when_dry_run_is_false( mocker: MockerFixture, ): streams_app = StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name=STREAMS_APP_NAME, **{ "namespace": "test-namespace", @@ -444,7 +444,7 @@ async def test_deploy_order_when_dry_run_is_false( }, ) mock_create_topic = mocker.patch.object( - streams_app._handlers.topic_handler, "create_topic" + streams_app.handlers_.topic_handler, "create_topic" ) mock_helm_upgrade_install = mocker.patch.object( streams_app.helm, "upgrade_install" @@ -645,8 +645,8 @@ async def test_get_input_output_topics( self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="my-app", **{ "namespace": "test-namespace", diff --git a/tests/components/test_streams_bootstrap.py b/tests/components/test_streams_bootstrap.py index 1485deb6c..b32beea4e 100644 --- a/tests/components/test_streams_bootstrap.py +++ b/tests/components/test_streams_bootstrap.py @@ -39,8 +39,8 @@ def handlers(self) -> ComponentHandlers: def test_default_configs(self, config: KpopsConfig, handlers: ComponentHandlers): streams_bootstrap = StreamsBootstrap( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="example-name", **{ "namespace": "test-namespace", @@ -63,8 +63,8 @@ async def test_should_deploy_streams_bootstrap_app( mocker: MockerFixture, ): streams_bootstrap = StreamsBootstrap( - _config=config, - _handlers=handlers, + config_=config, + handlers_=handlers, name="example-name", **{ "namespace": "test-namespace", diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 5c7f3f059..150164d9a 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -38,8 +38,8 @@ def inflate(self) -> list[PipelineComponent]: for topic_name, topic_config in self.to.topics.items(): if topic_config.type == OutputTopicTypes.OUTPUT: kafka_connector = KafkaSinkConnector( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, name=f"{self.name}-inflated-sink-connector", config={ # type: ignore[reportGeneralTypeIssues], required `connector.class` comes from defaults during enrichment "topics": topic_name, @@ -58,8 +58,8 @@ def inflate(self) -> list[PipelineComponent]: ) inflate_steps.append(kafka_connector) streams_app = StreamsApp( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, name=f"{self.name}-inflated-streams-app", to=ToSection( # type: ignore[reportGeneralTypeIssues] topics={ @@ -93,8 +93,8 @@ def provide_schema( class SimpleInflateConnectors(StreamsApp): def inflate(self) -> list[PipelineComponent]: connector = KafkaSinkConnector( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, name="inflated-connector-name", config={}, # type: ignore[reportArgumentType] ) diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index b24593930..887e5f0a7 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -22,8 +22,8 @@ def inflate(self) -> list[PipelineComponent]: for topic_name, topic_config in self.to.topics.items(): if topic_config.type == OutputTopicTypes.OUTPUT: kafka_connector = KafkaSinkConnector( - _config=self._config, - _handlers=self._handlers, + config_=self.config_, + handlers_=self.handlers_, name="sink-connector", config=KafkaConnectorConfig( **{ diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index b962daf29..d3b88e834 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -20,16 +20,13 @@ class TestComponentFactory(ModelFactory[PipelineComponent]): from_ = FromSection() enrich = False validate = False + handlers_ = ComponentHandlers(None, MagicMock(), MagicMock()) run_validation = False -kwargs = { - "_config": MagicMock(), - "_handlers": ComponentHandlers(None, MagicMock(), MagicMock()), -} -test_component_1 = TestComponentFactory.build(run_validation, **kwargs) -test_component_2 = TestComponentFactory.build(run_validation, **kwargs) -test_component_3 = TestComponentFactory.build(run_validation, **kwargs) +test_component_1 = TestComponentFactory.build(run_validation) +test_component_2 = TestComponentFactory.build(run_validation) +test_component_3 = TestComponentFactory.build(run_validation) test_component_1.name = "example1" test_component_2.name = "example2"