diff --git a/kpops/cli/custom_formatter.py b/kpops/cli/custom_formatter.py index fb5e44057..69fc1c73d 100644 --- a/kpops/cli/custom_formatter.py +++ b/kpops/cli/custom_formatter.py @@ -16,9 +16,7 @@ def format(self, record): logging.WARNING: typer.style(message_format, fg=typer.colors.YELLOW), logging.ERROR: typer.style(message_format, fg=typer.colors.RED), logging.CRITICAL: typer.style( - message_format, - fg=typer.colors.RED, - bold=True, + message_format, fg=typer.colors.RED, bold=True ), } diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 6e45cdb1c..c7f0e26a1 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -122,17 +122,12 @@ def setup_pipeline( handlers = setup_handlers(components_module, pipeline_config) return Pipeline.load_from_yaml( - pipeline_base_dir, - pipeline_path, - registry, - pipeline_config, - handlers, + pipeline_base_dir, pipeline_path, registry, pipeline_config, handlers ) def setup_handlers( - components_module: str | None, - config: PipelineConfig, + components_module: str | None, config: PipelineConfig ) -> ComponentHandlers: schema_handler = SchemaHandler.load_schema_handler(components_module, config) connector_handler = KafkaConnectHandler.from_pipeline_config(config) @@ -155,15 +150,13 @@ def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: def filter_steps_to_apply( - pipeline: Pipeline, - steps: set[str], - filter_type: FilterType, + pipeline: Pipeline, steps: set[str], filter_type: FilterType ) -> list[PipelineComponent]: def is_in_steps(component: PipelineComponent) -> bool: return component.name in steps log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {steps} and filter type of {filter_type.value}", + f"KPOPS_PIPELINE_STEPS is defined with values: {steps} and filter type of {filter_type.value}" ) filtered_steps = [ component @@ -179,9 +172,7 @@ def is_in_steps(component: PipelineComponent) -> bool: def get_steps_to_apply( - pipeline: Pipeline, - steps: str | None, - filter_type: FilterType, + pipeline: Pipeline, steps: str | None, filter_type: FilterType ) -> list[PipelineComponent]: if steps: return filter_steps_to_apply(pipeline, parse_steps(steps), filter_type) @@ -189,9 +180,7 @@ def get_steps_to_apply( def reverse_pipeline_steps( - pipeline: Pipeline, - steps: str | None, - filter_type: FilterType, + pipeline: Pipeline, steps: str | None, filter_type: FilterType ) -> Iterator[PipelineComponent]: return reversed(get_steps_to_apply(pipeline, steps, filter_type)) @@ -205,9 +194,7 @@ def log_action(action: str, pipeline_component: PipelineComponent): def create_pipeline_config( - config: Path, - defaults: Optional[Path], - verbose: bool, + config: Path, defaults: Optional[Path], verbose: bool ) -> PipelineConfig: setup_logging_level(verbose) PipelineConfig.Config.config_path = config @@ -224,7 +211,7 @@ def create_pipeline_config( Generate json schema. The schemas can be used to enable support for kpops files in a text editor. - """, + """ ) def schema( scope: SchemaScope = typer.Argument( @@ -239,8 +226,7 @@ def schema( ), components_module: Optional[str] = COMPONENTS_MODULES, include_stock_components: bool = typer.Option( - default=True, - help="Include the built-in KPOps components.", + default=True, help="Include the built-in KPOps components." ), ) -> None: match scope: @@ -251,7 +237,7 @@ def schema( @app.command( # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 - help="Enriches pipelines steps with defaults. The output is used as input for the deploy/destroy/... commands.", + help="Enriches pipelines steps with defaults. The output is used as input for the deploy/destroy/... commands." ) def generate( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -266,10 +252,7 @@ def generate( ) -> Pipeline: pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, - pipeline_path, - components_module, - pipeline_config, + pipeline_base_dir, pipeline_path, components_module, pipeline_config ) if not template: @@ -282,14 +265,14 @@ def generate( elif steps: log.warning( "The following flags are considered only when `--template` is set: \n \ - '--steps'", + '--steps'" ) return pipeline @app.command( - help="Deploy pipeline steps", + help="Deploy pipeline steps" ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def deploy( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -304,10 +287,7 @@ def deploy( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, - pipeline_path, - components_module, - pipeline_config, + pipeline_base_dir, pipeline_path, components_module, pipeline_config ) steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) @@ -317,7 +297,7 @@ def deploy( @app.command( - help="Destroy pipeline steps", + help="Destroy pipeline steps" ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def destroy( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -332,10 +312,7 @@ def destroy( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, - pipeline_path, - components_module, - pipeline_config, + pipeline_base_dir, pipeline_path, components_module, pipeline_config ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: @@ -344,7 +321,7 @@ def destroy( @app.command( - help="Reset pipeline steps", + help="Reset pipeline steps" ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def reset( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -359,10 +336,7 @@ def reset( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, - pipeline_path, - components_module, - pipeline_config, + pipeline_base_dir, pipeline_path, components_module, pipeline_config ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: @@ -372,7 +346,7 @@ def reset( @app.command( - help="Clean pipeline steps", + help="Clean pipeline steps" ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def clean( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -387,10 +361,7 @@ def clean( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, - pipeline_path, - components_module, - pipeline_config, + pipeline_base_dir, pipeline_path, components_module, pipeline_config ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: diff --git a/kpops/cli/pipeline_config.py b/kpops/cli/pipeline_config.py index 58e731db2..1400323f5 100644 --- a/kpops/cli/pipeline_config.py +++ b/kpops/cli/pipeline_config.py @@ -112,8 +112,7 @@ def customise_sources( env_settings: SettingsSourceCallable, file_secret_settings: SettingsSourceCallable, ) -> tuple[ - SettingsSourceCallable | Callable[[PipelineConfig], dict[str, Any]], - ..., + SettingsSourceCallable | Callable[[PipelineConfig], dict[str, Any]], ... ]: return ( env_settings, diff --git a/kpops/cli/registry.py b/kpops/cli/registry.py index 5f11e7b9b..a97e2cd91 100644 --- a/kpops/cli/registry.py +++ b/kpops/cli/registry.py @@ -41,9 +41,7 @@ def __getitem__(self, component_type: str) -> type[PipelineComponent]: return self._classes[component_type] except KeyError as ke: msg = f"Could not find a component of type {component_type}" - raise ClassNotFoundError( - msg, - ) from ke + raise ClassNotFoundError(msg) from ke def find_class(module_name: str, baseclass: type[T]) -> type[T]: @@ -59,7 +57,7 @@ def _find_classes(module_name: str, baseclass: type[T]) -> Iterator[type[T]]: if issubclass(_class, baseclass): # filter out internal kpops classes unless specifically requested if _class.__module__.startswith( - KPOPS_MODULE, + KPOPS_MODULE ) and not module_name.startswith(KPOPS_MODULE): continue yield _class diff --git a/kpops/component_handlers/helm_wrapper/dry_run_handler.py b/kpops/component_handlers/helm_wrapper/dry_run_handler.py index 7b1429dab..2d28957b7 100644 --- a/kpops/component_handlers/helm_wrapper/dry_run_handler.py +++ b/kpops/component_handlers/helm_wrapper/dry_run_handler.py @@ -18,7 +18,7 @@ def print_helm_diff(self, stdout: str, helm_release_name: str, log: Logger) -> N :param log: The Logger object of the component class """ current_release = list( - self._helm.get_manifest(helm_release_name, self.namespace), + self._helm.get_manifest(helm_release_name, self.namespace) ) if current_release: log.info(f"Helm release {helm_release_name} already exists") diff --git a/kpops/component_handlers/helm_wrapper/helm.py b/kpops/component_handlers/helm_wrapper/helm.py index 9436fd60b..b1b101b41 100644 --- a/kpops/component_handlers/helm_wrapper/helm.py +++ b/kpops/component_handlers/helm_wrapper/helm.py @@ -32,9 +32,7 @@ def __init__(self, helm_config: HelmConfig) -> None: self._version = self.get_version() if self._version.major != 3: msg = f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}" - raise RuntimeError( - msg, - ) + raise RuntimeError(msg) def add_repo( self, @@ -124,7 +122,7 @@ def uninstall( return self.__execute(command) except ReleaseNotFoundException: log.warning( - f"Release with name {release_name} not found. Could not uninstall app.", + f"Release with name {release_name} not found. Could not uninstall app." ) def template( @@ -187,9 +185,7 @@ def get_version(self) -> Version: version_match = re.search(r"^v(\d+(?:\.\d+){0,2})", short_version) if version_match is None: msg = f"Could not parse the Helm version.\n\nHelm output:\n{short_version}" - raise RuntimeError( - msg, - ) + raise RuntimeError(msg) version = map(int, version_match.group(1).split(".")) return Version(*version) diff --git a/kpops/component_handlers/helm_wrapper/model.py b/kpops/component_handlers/helm_wrapper/model.py index dce229fa0..af21abb3f 100644 --- a/kpops/component_handlers/helm_wrapper/model.py +++ b/kpops/component_handlers/helm_wrapper/model.py @@ -31,24 +31,19 @@ class RepoAuthFlags(BaseModel): """ username: str | None = Field( - default=None, - description=describe_attr("username", __doc__), + default=None, description=describe_attr("username", __doc__) ) password: str | None = Field( - default=None, - description=describe_attr("password", __doc__), + default=None, description=describe_attr("password", __doc__) ) ca_file: Path | None = Field( - default=None, - description=describe_attr("ca_file", __doc__), + default=None, description=describe_attr("ca_file", __doc__) ) cert_file: Path | None = Field( - default=None, - description=describe_attr("cert_file", __doc__), + default=None, description=describe_attr("cert_file", __doc__) ) insecure_skip_tls_verify: bool = Field( - default=False, - description=describe_attr("insecure_skip_tls_verify", __doc__), + default=False, description=describe_attr("insecure_skip_tls_verify", __doc__) ) class Config(DescConfig): @@ -78,13 +73,11 @@ class HelmRepoConfig(BaseModel): """ repository_name: str = Field( - default=..., - description=describe_attr("repository_name", __doc__), + default=..., description=describe_attr("repository_name", __doc__) ) url: str = Field(default=..., description=describe_attr("url", __doc__)) repo_auth_flags: RepoAuthFlags = Field( - default=RepoAuthFlags(), - description=describe_attr("repo_auth_flags", __doc__), + default=RepoAuthFlags(), description=describe_attr("repo_auth_flags", __doc__) ) class Config(DescConfig): @@ -138,7 +131,7 @@ def to_command(self) -> list[str]: [ "--set-file", ",".join([f"{key}={path}" for key, path in self.set_file.items()]), - ], + ] ) if self.create_namespace: command.append("--create-namespace") diff --git a/kpops/component_handlers/helm_wrapper/utils.py b/kpops/component_handlers/helm_wrapper/utils.py index e05ee187f..7ad76b93a 100644 --- a/kpops/component_handlers/helm_wrapper/utils.py +++ b/kpops/component_handlers/helm_wrapper/utils.py @@ -16,7 +16,7 @@ def trim_release_name(name: str, suffix: str = "") -> str: if len(name) > RELEASE_NAME_MAX_LEN: new_name = name[: (RELEASE_NAME_MAX_LEN - len(suffix))] + suffix log.critical( - f"Invalid Helm release name '{name}'. Truncating to {RELEASE_NAME_MAX_LEN} characters: \n {name} --> {new_name}", + f"Invalid Helm release name '{name}'. Truncating to {RELEASE_NAME_MAX_LEN} characters: \n {name} --> {new_name}" ) name = new_name return name diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index 35c043dd1..13f02a80d 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -36,8 +36,7 @@ def host(self) -> str: return self._host def create_connector( - self, - connector_config: KafkaConnectorConfig, + self, connector_config: KafkaConnectorConfig ) -> KafkaConnectResponse: """Create a new connector. @@ -48,9 +47,7 @@ def create_connector( config_json = connector_config.dict() connect_data = {"name": connector_config.name, "config": config_json} response = httpx.post( - url=f"{self._host}/connectors", - headers=HEADERS, - json=connect_data, + url=f"{self._host}/connectors", headers=HEADERS, json=connect_data ) if response.status_code == httpx.codes.CREATED: log.info(f"Connector {connector_config.name} created.") @@ -58,7 +55,7 @@ def create_connector( return KafkaConnectResponse(**response.json()) elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while creating a connector... Retrying...", + "Rebalancing in progress while creating a connector... Retrying..." ) time.sleep(1) self.create_connector(connector_config) @@ -74,8 +71,7 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: :return: Information about the connector. """ response = httpx.get( - url=f"{self._host}/connectors/{connector_name}", - headers=HEADERS, + url=f"{self._host}/connectors/{connector_name}", headers=HEADERS ) if response.status_code == httpx.codes.OK: log.info(f"Connector {connector_name} exists.") @@ -86,15 +82,14 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: raise ConnectorNotFoundException elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while getting a connector... Retrying...", + "Rebalancing in progress while getting a connector... Retrying..." ) sleep(1) self.get_connector(connector_name) raise KafkaConnectError(response) def update_connector_config( - self, - connector_config: KafkaConnectorConfig, + self, connector_config: KafkaConnectorConfig ) -> KafkaConnectResponse: """Create or update a connector. @@ -122,15 +117,14 @@ def update_connector_config( return KafkaConnectResponse(**data) elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while updating a connector... Retrying...", + "Rebalancing in progress while updating a connector... Retrying..." ) sleep(1) self.update_connector_config(connector_config) raise KafkaConnectError(response) def validate_connector_config( - self, - connector_config: KafkaConnectorConfig, + self, connector_config: KafkaConnectorConfig ) -> list[str]: """Validate connector config using the given configuration. @@ -146,7 +140,7 @@ def validate_connector_config( if response.status_code == httpx.codes.OK: kafka_connect_error_response = KafkaConnectConfigErrorResponse( - **response.json(), + **response.json() ) errors: list[str] = [] @@ -155,7 +149,7 @@ def validate_connector_config( if len(config.value.errors) > 0: for error in config.value.errors: errors.append( - f"Found error for field {config.value.name}: {error}", + f"Found error for field {config.value.name}: {error}" ) return errors raise KafkaConnectError(response) @@ -169,8 +163,7 @@ def delete_connector(self, connector_name: str) -> None: :raises ConnectorNotFoundException: Connector not found """ response = httpx.delete( - url=f"{self._host}/connectors/{connector_name}", - headers=HEADERS, + url=f"{self._host}/connectors/{connector_name}", headers=HEADERS ) if response.status_code == httpx.codes.NO_CONTENT: log.info(f"Connector {connector_name} deleted.") @@ -180,7 +173,7 @@ def delete_connector(self, connector_name: str) -> None: raise ConnectorNotFoundException elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while deleting a connector... Retrying...", + "Rebalancing in progress while deleting a connector... Retrying..." ) sleep(1) self.delete_connector(connector_name) diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index d4b00d6aa..c810a9c36 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -34,10 +34,7 @@ def __init__( self._timeout = timeout def create_connector( - self, - connector_config: KafkaConnectorConfig, - *, - dry_run: bool, + self, connector_config: KafkaConnectorConfig, *, dry_run: bool ) -> None: """Create a connector. @@ -57,7 +54,7 @@ def create_connector( timeout( lambda: self._connect_wrapper.update_connector_config( - connector_config, + connector_config ), secs=self._timeout, ) @@ -89,12 +86,11 @@ def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None: ) except ConnectorNotFoundException: log.warning( - f"Connector Destruction: the connector {connector_name} does not exist. Skipping.", + f"Connector Destruction: the connector {connector_name} does not exist. Skipping." ) def __dry_run_connector_creation( - self, - connector_config: KafkaConnectorConfig, + self, connector_config: KafkaConnectorConfig ) -> None: connector_name = connector_config.name try: @@ -110,7 +106,7 @@ def __dry_run_connector_creation( except ConnectorNotFoundException: diff = render_diff({}, connector_config.dict()) log.info( - f"Connector Creation: connector {connector_name} does not exist. Creating connector with config:\n{diff}", + f"Connector Creation: connector {connector_name} does not exist. Creating connector with config:\n{diff}" ) log.debug("POST /connectors HTTP/1.1") log.debug(f"HOST: {self._connect_wrapper.host}") @@ -119,12 +115,10 @@ def __dry_run_connector_creation( if len(errors) > 0: formatted_errors = "\n".join(errors) msg = f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}" - raise ConnectorStateException( - msg, - ) + raise ConnectorStateException(msg) else: log.info( - f"Connector Creation: connector config for {connector_name} is valid!", + f"Connector Creation: connector config for {connector_name} is valid!" ) def __dry_run_connector_deletion(self, connector_name: str) -> None: @@ -132,14 +126,14 @@ def __dry_run_connector_deletion(self, connector_name: str) -> None: self._connect_wrapper.get_connector(connector_name) log.info( magentaify( - f"Connector Destruction: connector {connector_name} already exists. Deleting connector.", - ), + f"Connector Destruction: connector {connector_name} already exists. Deleting connector." + ) ) log.debug(f"DELETE /connectors/{connector_name} HTTP/1.1") log.debug(f"HOST: {self._connect_wrapper.host}") except ConnectorNotFoundException: log.warning( - f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping.", + f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping." ) @classmethod diff --git a/kpops/component_handlers/kafka_connect/timeout.py b/kpops/component_handlers/kafka_connect/timeout.py index 398ace4e4..e75ac7361 100644 --- a/kpops/component_handlers/kafka_connect/timeout.py +++ b/kpops/component_handlers/kafka_connect/timeout.py @@ -29,5 +29,5 @@ async def main_supervisor(func: Callable[..., T], secs: int) -> T: return loop.run_until_complete(main_supervisor(func, secs)) except TimeoutError: log.exception( - f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml.", + f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml." ) diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 1a05ec86c..63d88b726 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -16,12 +16,12 @@ ) from kpops.utils.colorify import greenify, magentaify -log = logging.getLogger("SchemaHandler") - if TYPE_CHECKING: from kpops.cli.pipeline_config import PipelineConfig from kpops.components.base_components.models.to_section import ToSection +log = logging.getLogger("SchemaHandler") + class SchemaHandler: def __init__(self, url: str, components_module: str | None): @@ -33,22 +33,16 @@ def schema_provider(self) -> SchemaProvider: try: if not self.components_module: msg = f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." - raise ValueError( - msg, - ) + raise ValueError(msg) schema_provider_class = find_class(self.components_module, SchemaProvider) return schema_provider_class() # pyright: ignore[reportGeneralTypeIssues] except ClassNotFoundError as e: msg = f"No schema provider found in components module {self.components_module}. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." - raise ValueError( - msg, - ) from e + raise ValueError(msg) from e @classmethod def load_schema_handler( - cls, - components_module: str | None, - config: PipelineConfig, + cls, components_module: str | None, config: PipelineConfig ) -> SchemaHandler | None: if not config.schema_registry_url: return None @@ -64,19 +58,14 @@ def submit_schemas(self, to_section: ToSection, dry_run: bool = True) -> None: key_schema_class = config.key_schema if value_schema_class is not None: schema = self.schema_provider.provide_schema( - value_schema_class, - to_section.models, + value_schema_class, to_section.models ) self.__submit_value_schema( - schema, - value_schema_class, - dry_run, - topic_name, + schema, value_schema_class, dry_run, topic_name ) if key_schema_class is not None: schema = self.schema_provider.provide_schema( - key_schema_class, - to_section.models, + key_schema_class, to_section.models ) self.__submit_key_schema(schema, key_schema_class, dry_run, topic_name) @@ -130,29 +119,25 @@ def __submit_schema( else: log.info( greenify( - f"Schema Submission: The subject {subject} will be submitted.", - ), + f"Schema Submission: The subject {subject} will be submitted." + ) ) else: self.schema_registry_client.register(subject=subject, schema=schema) log.info( - f"Schema Submission: schema submitted for {subject} with model {schema_class}.", + f"Schema Submission: schema submitted for {subject} with model {schema_class}." ) def __subject_exists(self, subject: str) -> bool: return len(self.schema_registry_client.get_versions(subject)) > 0 def __check_compatibility( - self, - schema: Schema, - schema_class: str, - subject: str, + self, schema: Schema, schema_class: str, subject: str ) -> None: registered_version = self.schema_registry_client.check_version(subject, schema) if registered_version is None: if not self.schema_registry_client.test_compatibility( - subject=subject, - schema=schema, + subject=subject, schema=schema ): schema_str = ( schema.flat_schema @@ -160,16 +145,14 @@ def __check_compatibility( else str(schema) ) msg = f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}" - raise Exception( - msg, - ) + raise Exception(msg) else: log.debug( - f"Schema Submission: schema was already submitted for the subject {subject} as version {registered_version.schema}. Therefore, the specified schema must be compatible.", + f"Schema Submission: schema was already submitted for the subject {subject} as version {registered_version.schema}. Therefore, the specified schema must be compatible." ) log.info( - f"Schema Submission: compatible schema for {subject} with model {schema_class}.", + f"Schema Submission: compatible schema for {subject} with model {schema_class}." ) def __delete_subject(self, subject: str, dry_run: bool) -> None: @@ -178,5 +161,5 @@ def __delete_subject(self, subject: str, dry_run: bool) -> None: else: version_list = self.schema_registry_client.delete_subject(subject) log.info( - f"Schema Deletion: deleted {len(version_list)} versions for subject {subject}.", + f"Schema Deletion: deleted {len(version_list)} versions for subject {subject}." ) diff --git a/kpops/component_handlers/schema_handler/schema_provider.py b/kpops/component_handlers/schema_handler/schema_provider.py index 253491e9b..0c0423a40 100644 --- a/kpops/component_handlers/schema_handler/schema_provider.py +++ b/kpops/component_handlers/schema_handler/schema_provider.py @@ -5,17 +5,15 @@ from schema_registry.client.schema import AvroSchema, JsonSchema -Schema: TypeAlias = AvroSchema | JsonSchema - if TYPE_CHECKING: from kpops.components.base_components.models import ModelName, ModelVersion +Schema: TypeAlias = AvroSchema | JsonSchema + class SchemaProvider(ABC): @abstractmethod def provide_schema( - self, - schema_class: str, - models: dict[ModelName, ModelVersion], + self, schema_class: str, models: dict[ModelName, ModelVersion] ) -> Schema: ... diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index 75888de16..dae606108 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -35,11 +35,10 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: try: self.proxy_wrapper.get_topic(topic_name=topic_name) topic_config_in_cluster = self.proxy_wrapper.get_topic_config( - topic_name=topic_name, + topic_name=topic_name ) differences = self.__get_topic_config_diff( - topic_config_in_cluster, - topic_config.configs, + topic_config_in_cluster, topic_config.configs ) if differences: @@ -47,11 +46,11 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: for difference in differences: if difference.diff_type is DiffType.REMOVE: json_body.append( - {"name": difference.key, "operation": "DELETE"}, + {"name": difference.key, "operation": "DELETE"} ) elif config_value := difference.change.new_value: json_body.append( - {"name": difference.key, "value": config_value}, + {"name": difference.key, "value": config_value} ) self.proxy_wrapper.batch_alter_topic_config( topic_name=topic_name, @@ -60,7 +59,7 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: else: log.info( - f"Topic Creation: config of topic {topic_name} didn't change. Skipping update.", + f"Topic Creation: config of topic {topic_name} didn't change. Skipping update." ) except TopicNotFoundException: self.proxy_wrapper.create_topic(topic_spec=topic_spec) @@ -75,16 +74,15 @@ def delete_topics(self, to_section: ToSection, dry_run: bool) -> None: self.proxy_wrapper.delete_topic(topic_name=topic_name) except TopicNotFoundException: log.warning( - f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping.", + f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping." ) @staticmethod def __get_topic_config_diff( - cluster_config: TopicConfigResponse, - current_config: dict, + cluster_config: TopicConfigResponse, current_config: dict ) -> list[Diff]: comparable_in_cluster_config_dict, _ = parse_rest_proxy_topic_config( - cluster_config, + cluster_config ) return list(Diff.from_dicts(comparable_in_cluster_config_dict, current_config)) @@ -99,11 +97,10 @@ def __dry_run_topic_creation( topic_name = topic_in_cluster.topic_name if topic_config: topic_config_in_cluster = self.proxy_wrapper.get_topic_config( - topic_name=topic_name, + topic_name=topic_name ) in_cluster_config, new_config = parse_and_compare_topic_configs( - topic_config_in_cluster, - topic_config.configs, + topic_config_in_cluster, topic_config.configs ) if diff := render_diff(in_cluster_config, new_config): log.info(f"Config changes for topic {topic_name}:") @@ -123,15 +120,13 @@ def __dry_run_topic_creation( self.__check_partition_count(topic_in_cluster, topic_spec, effective_config) self.__check_replication_factor( - topic_in_cluster, - topic_spec, - effective_config, + topic_in_cluster, topic_spec, effective_config ) except TopicNotFoundException: log.info( greenify( - f"Topic Creation: {topic_name} does not exist in the cluster. Creating topic.", - ), + f"Topic Creation: {topic_name} does not exist in the cluster. Creating topic." + ) ) log.debug(f"POST /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1") log.debug(f"Host: {self.proxy_wrapper.host}") @@ -150,13 +145,11 @@ def __check_partition_count( topic_spec.partitions_count or int(broker_config["num.partitions"]) ): log.debug( - f"Topic Creation: partition count of topic {topic_name} did not change. Current partitions count {partition_count}. Updating configs.", + f"Topic Creation: partition count of topic {topic_name} did not change. Current partitions count {partition_count}. Updating configs." ) else: msg = f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}." - raise TopicTransactionError( - msg, - ) + raise TopicTransactionError(msg) @staticmethod def __check_replication_factor( @@ -171,28 +164,26 @@ def __check_replication_factor( or int(broker_config["default.replication.factor"]) ): log.debug( - f"Topic Creation: replication factor of topic {topic_name} did not change. Current replication factor {replication_factor}. Updating configs.", + f"Topic Creation: replication factor of topic {topic_name} did not change. Current replication factor {replication_factor}. Updating configs." ) else: msg = f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}." - raise TopicTransactionError( - msg, - ) + raise TopicTransactionError(msg) def __dry_run_topic_deletion(self, topic_name: str) -> None: try: topic_in_cluster = self.proxy_wrapper.get_topic(topic_name=topic_name) log.info( magentaify( - f"Topic Deletion: topic {topic_in_cluster.topic_name} exists in the cluster. Deleting topic.", - ), + f"Topic Deletion: topic {topic_in_cluster.topic_name} exists in the cluster. Deleting topic." + ) ) log.debug( - f"DELETE /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1", + f"DELETE /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1" ) except TopicNotFoundException: log.warning( - f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping.", + f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping." ) log.debug(f"Host: {self.proxy_wrapper.host}") log.debug(HEADERS) diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index 9eb706b96..4edc3633c 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -26,9 +26,7 @@ class ProxyWrapper: def __init__(self, pipeline_config: PipelineConfig) -> None: if not pipeline_config.kafka_rest_host: msg = "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." - raise ValueError( - msg, - ) + raise ValueError(msg) self._host = pipeline_config.kafka_rest_host diff --git a/kpops/component_handlers/topic/utils.py b/kpops/component_handlers/topic/utils.py index 904833a28..70f71d0b3 100644 --- a/kpops/component_handlers/topic/utils.py +++ b/kpops/component_handlers/topic/utils.py @@ -6,18 +6,17 @@ def parse_and_compare_topic_configs( - topic_config_in_cluster: TopicConfigResponse, - topic_config: dict, + topic_config_in_cluster: TopicConfigResponse, topic_config: dict ) -> tuple[dict, dict]: comparable_in_cluster_config_dict, default_configs = parse_rest_proxy_topic_config( - topic_config_in_cluster, + topic_config_in_cluster ) cluster_defaults_overwrite = set(topic_config.keys()) - set( - comparable_in_cluster_config_dict.keys(), + comparable_in_cluster_config_dict.keys() ) config_overwrites = set(comparable_in_cluster_config_dict.keys()) - set( - topic_config.keys(), + topic_config.keys() ) populate_default_configs( cluster_defaults_overwrite, diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index 8f3f0929b..d9100bd25 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -93,24 +93,17 @@ def extend_with_defaults(self, **kwargs) -> dict: config: PipelineConfig = kwargs["config"] log.debug( typer.style( - "Enriching component of type ", - fg=typer.colors.GREEN, - bold=False, + "Enriching component of type ", fg=typer.colors.GREEN, bold=False ) + typer.style( - kwargs.get("type"), - fg=typer.colors.GREEN, - bold=True, - underline=True, - ), + kwargs.get("type"), fg=typer.colors.GREEN, bold=True, underline=True + ) ) main_default_file_path, environment_default_file_path = get_defaults_file_paths( - config, + config ) defaults = load_defaults( - self.__class__, - main_default_file_path, - environment_default_file_path, + self.__class__, main_default_file_path, environment_default_file_path ) return update_nested(kwargs, defaults) @@ -178,7 +171,7 @@ def defaults_from_yaml(path: Path, key: str) -> dict: if value is None: return {} log.debug( - f"\tFound defaults for component type {typer.style(key, bold=True, fg=typer.colors.MAGENTA)} in file: {path}", + f"\tFound defaults for component type {typer.style(key, bold=True, fg=typer.colors.MAGENTA)} in file: {path}" ) return value @@ -195,11 +188,11 @@ def get_defaults_file_paths(config: PipelineConfig) -> tuple[Path, Path]: """ defaults_dir = Path(config.defaults_path).resolve() main_default_file_path = defaults_dir / Path( - config.defaults_filename_prefix, + config.defaults_filename_prefix ).with_suffix(".yaml") environment_default_file_path = defaults_dir / Path( - f"{config.defaults_filename_prefix}_{config.environment}", + f"{config.defaults_filename_prefix}_{config.environment}" ).with_suffix(".yaml") return main_default_file_path, environment_default_file_path diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 1650a9bdf..a13dc7a7d 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -30,8 +30,7 @@ class KafkaStreamsConfig(BaseModel): brokers: str = Field(default=..., description=describe_attr("brokers", __doc__)) schema_registry_url: str | None = Field( - default=None, - description=describe_attr("schema_registry_url", __doc__), + default=None, description=describe_attr("schema_registry_url", __doc__) ) class Config(CamelCaseConfig, DescConfig): @@ -46,12 +45,10 @@ class KafkaAppConfig(KubernetesAppConfig): """ streams: KafkaStreamsConfig = Field( - default=..., - description=describe_attr("streams", __doc__), + default=..., description=describe_attr("streams", __doc__) ) name_override: str | None = Field( - default=None, - description=describe_attr("name_override", __doc__), + default=None, description=describe_attr("name_override", __doc__) ) @@ -92,14 +89,12 @@ def clean_up_helm_chart(self) -> str: def deploy(self, dry_run: bool) -> None: if self.to: self.handlers.topic_handler.create_topics( - to_section=self.to, - dry_run=dry_run, + to_section=self.to, dry_run=dry_run ) if self.handlers.schema_handler: self.handlers.schema_handler.submit_schemas( - to_section=self.to, - dry_run=dry_run, + to_section=self.to, dry_run=dry_run ) super().deploy(dry_run) @@ -118,8 +113,7 @@ def _run_clean_up_job( """ suffix = "-clean" clean_up_release_name = trim_release_name( - self.helm_release_name + suffix, - suffix, + self.helm_release_name + suffix, suffix ) log.info(f"Uninstall old cleanup job for {clean_up_release_name}") @@ -128,10 +122,7 @@ def _run_clean_up_job( log.info(f"Init cleanup job for {clean_up_release_name}") stdout = self.__install_clean_up_job( - clean_up_release_name, - suffix, - values, - dry_run, + clean_up_release_name, suffix, values, dry_run ) if dry_run: diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 6420662a3..96ee68041 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -67,8 +67,7 @@ class KafkaConnector(PipelineComponent, ABC): description=describe_attr("repo_config", __doc__), ) version: str | None = Field( - default="1.0.4", - description=describe_attr("version", __doc__), + default="1.0.4", description=describe_attr("version", __doc__) ) resetter_values: dict = Field( default_factory=dict, @@ -141,14 +140,12 @@ def template_flags(self) -> HelmTemplateFlags: def deploy(self, dry_run: bool) -> None: if self.to: self.handlers.topic_handler.create_topics( - to_section=self.to, - dry_run=dry_run, + to_section=self.to, dry_run=dry_run ) if self.handlers.schema_handler: self.handlers.schema_handler.submit_schemas( - to_section=self.to, - dry_run=dry_run, + to_section=self.to, dry_run=dry_run ) self.handlers.connector_handler.create_connector(self.app, dry_run=dry_run) @@ -156,8 +153,7 @@ def deploy(self, dry_run: bool) -> None: @override def destroy(self, dry_run: bool) -> None: self.handlers.connector_handler.destroy_connector( - self.full_name, - dry_run=dry_run, + self.full_name, dry_run=dry_run ) @override @@ -165,8 +161,7 @@ def clean(self, dry_run: bool) -> None: if self.to: if self.handlers.schema_handler: self.handlers.schema_handler.delete_schemas( - to_section=self.to, - dry_run=dry_run, + to_section=self.to, dry_run=dry_run ) self.handlers.topic_handler.delete_topics(self.to, dry_run=dry_run) @@ -188,24 +183,22 @@ def _run_connect_resetter( """ log.info( magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}", - ), + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}" + ) ) self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) log.info( magentaify( - f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}", - ), + f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}" + ) ) stdout = self.__install_connect_resetter(dry_run, **kwargs) if dry_run: self.dry_run_handler.print_helm_diff( - stdout, - self._resetter_release_name, - log, + stdout, self._resetter_release_name, log ) if not retain_clean_jobs: @@ -369,9 +362,7 @@ def clean(self, dry_run: bool) -> None: self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True) def __run_kafka_connect_resetter( - self, - dry_run: bool, - delete_consumer_group: bool, + self, dry_run: bool, delete_consumer_group: bool ) -> None: """Run the connector resetter. diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index 2e0b44511..ff35459c3 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -25,7 +25,7 @@ log = logging.getLogger("KubernetesAppComponent") KUBERNETES_NAME_CHECK_PATTERN = re.compile( - r"^(?![0-9]+$)(?!.*-$)(?!-)[a-z0-9-.]{1,253}(? str: msg = ( f"Please implement the helm_chart property of the {self.__module__} module." ) - raise NotImplementedError( - msg, - ) + raise NotImplementedError(msg) @property def helm_flags(self) -> HelmFlags: @@ -174,7 +172,7 @@ def print_helm_diff(self, stdout: str) -> None: :param stdout: The output of a Helm command that installs or upgrades the release """ current_release = list( - self.helm.get_manifest(self.helm_release_name, self.namespace), + self.helm.get_manifest(self.helm_release_name, self.namespace) ) if current_release: log.info(f"Helm release {self.helm_release_name} already exists") diff --git a/kpops/components/base_components/models/from_section.py b/kpops/components/base_components/models/from_section.py index c416026c9..153133639 100644 --- a/kpops/components/base_components/models/from_section.py +++ b/kpops/components/base_components/models/from_section.py @@ -27,8 +27,7 @@ class FromTopic(BaseModel): """ type: InputTopicTypes | None = Field( - default=None, - description=describe_attr("type", __doc__), + default=None, description=describe_attr("type", __doc__) ) role: str | None = Field(default=None, description=describe_attr("role", __doc__)) diff --git a/kpops/components/base_components/models/to_section.py b/kpops/components/base_components/models/to_section.py index d56476659..03f1d7141 100644 --- a/kpops/components/base_components/models/to_section.py +++ b/kpops/components/base_components/models/to_section.py @@ -31,9 +31,7 @@ class TopicConfig(BaseModel): """ type: OutputTopicTypes | None = Field( - default=None, - title="Topic type", - description=describe_attr("type", __doc__), + default=None, title="Topic type", description=describe_attr("type", __doc__) ) key_schema: str | None = Field( default=None, @@ -56,8 +54,7 @@ class TopicConfig(BaseModel): description=describe_attr("replication_factor", __doc__), ) configs: dict[str, str | int] = Field( - default={}, - description=describe_attr("configs", __doc__), + default={}, description=describe_attr("configs", __doc__) ) role: str | None = Field(default=None, description=describe_attr("role", __doc__)) @@ -83,12 +80,10 @@ class ToSection(BaseModel): """ topics: dict[TopicName, TopicConfig] = Field( - default={}, - description=describe_attr("topics", __doc__), + default={}, description=describe_attr("topics", __doc__) ) models: dict[ModelName, ModelVersion] = Field( - default={}, - description=describe_attr("models", __doc__), + default={}, description=describe_attr("models", __doc__) ) class Config(DescConfig): diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index 1e5348948..8af1a68c6 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -15,12 +15,10 @@ class ProducerStreamsConfig(KafkaStreamsConfig): """ extra_output_topics: dict[str, str] = Field( - default={}, - description=describe_attr("extra_output_topics", __doc__), + default={}, description=describe_attr("extra_output_topics", __doc__) ) output_topic: str | None = Field( - default=None, - description=describe_attr("output_topic", __doc__), + default=None, description=describe_attr("output_topic", __doc__) ) @@ -31,8 +29,7 @@ class ProducerValues(KafkaAppConfig): """ streams: ProducerStreamsConfig = Field( - default=..., - description=describe_attr("streams", __doc__), + default=..., description=describe_attr("streams", __doc__) ) class Config(BaseConfig): diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 0433fb5dc..ca2db77ae 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -27,36 +27,28 @@ class StreamsConfig(KafkaStreamsConfig): """ input_topics: list[str] = Field( - default=[], - description=describe_attr("input_topics", __doc__), + default=[], description=describe_attr("input_topics", __doc__) ) input_pattern: str | None = Field( - default=None, - description=describe_attr("input_pattern", __doc__), + default=None, description=describe_attr("input_pattern", __doc__) ) extra_input_topics: dict[str, list[str]] = Field( - default={}, - description=describe_attr("extra_input_topics", __doc__), + default={}, description=describe_attr("extra_input_topics", __doc__) ) extra_input_patterns: dict[str, str] = Field( - default={}, - description=describe_attr("extra_input_patterns", __doc__), + default={}, description=describe_attr("extra_input_patterns", __doc__) ) extra_output_topics: dict[str, str] = Field( - default={}, - description=describe_attr("extra_output_topics", __doc__), + default={}, description=describe_attr("extra_output_topics", __doc__) ) output_topic: str | None = Field( - default=None, - description=describe_attr("output_topic", __doc__), + default=None, description=describe_attr("output_topic", __doc__) ) error_topic: str | None = Field( - default=None, - description=describe_attr("error_topic", __doc__), + default=None, description=describe_attr("error_topic", __doc__) ) config: dict[str, str] = Field( - default={}, - description=describe_attr("config", __doc__), + default={}, description=describe_attr("config", __doc__) ) def add_input_topics(self, topics: list[str]) -> None: @@ -77,7 +69,7 @@ def add_extra_input_topics(self, role: str, topics: list[str]) -> None: :param role: Topic role """ self.extra_input_topics[role] = deduplicate( - self.extra_input_topics.get(role, []) + topics, + self.extra_input_topics.get(role, []) + topics ) @override diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index 96588beee..920eec202 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -69,14 +69,12 @@ def validate_unique_names(self) -> None: duplicates = [name for name, count in Counter(step_names).items() if count > 1] if duplicates: msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}" - raise ValidationError( - msg, - ) + raise ValidationError(msg) @staticmethod def _populate_component_name(component: PipelineComponent) -> None: # TODO: remove with suppress( - AttributeError, # Some components like Kafka Connect do not have a name_override attribute + AttributeError # Some components like Kafka Connect do not have a name_override attribute ): if (app := getattr(component, "app")) and app.name_override is None: app.name_override = component.full_name @@ -94,9 +92,7 @@ def create_env_components_index( for component in environment_components: if "type" not in component or "name" not in component: msg = "To override components per environment, every component should at least have a type and a name." - raise ValueError( - msg, - ) + raise ValueError(msg) index[component["name"]] = component return index @@ -145,17 +141,13 @@ def load_from_yaml( main_content = load_yaml_file(path, substitution=ENV) if not isinstance(main_content, list): msg = f"The pipeline definition {path} should contain a list of components" - raise TypeError( - msg, - ) + raise TypeError(msg) env_content = [] if (env_file := Pipeline.pipeline_filename_environment(path, config)).exists(): env_content = load_yaml_file(env_file, substitution=ENV) if not isinstance(env_content, list): msg = f"The pipeline definition {env_file} should contain a list of components" - raise TypeError( - msg, - ) + raise TypeError(msg) return cls(main_content, env_content, registry, config, handlers) @@ -173,24 +165,18 @@ def parse_components(self, component_list: list[dict]) -> None: component_type: str = component_data["type"] except KeyError as ke: msg = "Every component must have a type defined, this component does not have one." - raise ValueError( - msg, - ) from ke + raise ValueError(msg) from ke component_class = self.registry[component_type] self.apply_component(component_class, component_data) except Exception as ex: # noqa: BLE001 if "name" in component_data: msg = f"Error enriching {component_data['type']} component {component_data['name']}" - raise ParsingException( - msg, - ) from ex + raise ParsingException(msg) from ex else: raise ParsingException from ex def apply_component( - self, - component_class: type[PipelineComponent], - component_data: dict, + self, component_class: type[PipelineComponent], component_data: dict ) -> None: """Instantiate, enrich and inflate pipeline component. @@ -217,15 +203,14 @@ def apply_component( from_topic, ) in enriched_component.from_.components.items(): original_from_component = self.components.find( - original_from_component_name, + original_from_component_name ) inflated_from_component = original_from_component.inflate()[-1] resolved_from_component = self.components.find( - inflated_from_component.name, + inflated_from_component.name ) enriched_component.weave_from_topics( - resolved_from_component.to, - from_topic, + resolved_from_component.to, from_topic ) elif self.components: # read from previous component @@ -273,7 +258,7 @@ def print_yaml(self, substitution: dict | None = None) -> None: theme="ansi_dark", ) Console( - width=1000, # HACK: overwrite console width to avoid truncating output + width=1000 # HACK: overwrite console width to avoid truncating output ).print(syntax) def __iter__(self) -> Iterator[PipelineComponent]: @@ -282,8 +267,8 @@ def __iter__(self) -> Iterator[PipelineComponent]: def __str__(self) -> str: return yaml.dump( json.loads( # HACK: serialize types on Pydantic model export, which are not serialized by .dict(); e.g. pathlib.Path - self.components.json(exclude_none=True, by_alias=True), - ), + self.components.json(exclude_none=True, by_alias=True) + ) ) def __len__(self) -> int: @@ -309,15 +294,14 @@ def substitute_in_component(self, component_as_dict: dict) -> dict: substitution_hardcoded, ) substitution = generate_substitution( - json.loads(config.json()), - existing_substitution=component_substitution, + json.loads(config.json()), existing_substitution=component_substitution ) return json.loads( substitute_nested( json.dumps(component_as_dict), **update_nested_pair(substitution, ENV), - ), + ) ) def validate(self) -> None: diff --git a/kpops/utils/dict_differ.py b/kpops/utils/dict_differ.py index 5bc8d720a..934924e21 100644 --- a/kpops/utils/dict_differ.py +++ b/kpops/utils/dict_differ.py @@ -54,9 +54,7 @@ class Diff(Generic[T]): @staticmethod def from_dicts( - d1: dict, - d2: dict, - ignore: set[str] | None = None, + d1: dict, d2: dict, ignore: set[str] | None = None ) -> Iterator[Diff]: for diff_type, keys, changes in diff(d1, d2, ignore=ignore): if not isinstance(changes_tmp := changes, list): @@ -91,8 +89,8 @@ def render_diff(d1: dict, d2: dict, ignore: set[str] | None = None) -> str | Non differ.compare( to_yaml(d1) if d1 else "", to_yaml(d2_filtered) if d2_filtered else "", - ), - ), + ) + ) ) diff --git a/kpops/utils/dict_ops.py b/kpops/utils/dict_ops.py index d3c173edc..14cc849e3 100644 --- a/kpops/utils/dict_ops.py +++ b/kpops/utils/dict_ops.py @@ -46,9 +46,7 @@ def update_nested(*argv: dict) -> dict: def flatten_mapping( - nested_mapping: Mapping[str, Any], - prefix: str | None = None, - separator: str = "_", + nested_mapping: Mapping[str, Any], prefix: str | None = None, separator: str = "_" ) -> dict[str, Any]: """Flattens a Mapping. diff --git a/kpops/utils/environment.py b/kpops/utils/environment.py index b1b2271b4..0ed7ae920 100644 --- a/kpops/utils/environment.py +++ b/kpops/utils/environment.py @@ -13,7 +13,7 @@ def __init__(self, mapping=None, /, **kwargs) -> None: mapping = {} if kwargs: mapping.update( - {transformation(key): value for key, value in kwargs.items()}, + {transformation(key): value for key, value in kwargs.items()} ) super().__init__(mapping) diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index c1d96ce5c..7cad9422d 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -38,8 +38,7 @@ def field_schema(field: ModelField, **kwargs: Any) -> Any: def _is_valid_component( - defined_component_types: set[str], - component: type[PipelineComponent], + defined_component_types: set[str], component: type[PipelineComponent] ) -> bool: """Check whether a PipelineComponent subclass has a valid definition for the schema generation. @@ -58,8 +57,7 @@ def _is_valid_component( def _add_components( - components_module: str, - components: tuple[type[PipelineComponent]] | None = None, + components_module: str, components: tuple[type[PipelineComponent]] | None = None ) -> tuple[type[PipelineComponent]]: """Add components to a components tuple. @@ -85,8 +83,7 @@ def _add_components( def gen_pipeline_schema( - components_module: str | None = None, - include_stock_components: bool = True, + components_module: str | None = None, include_stock_components: bool = True ) -> None: """Generate a json schema from the models of pipeline components. @@ -128,8 +125,7 @@ def gen_pipeline_schema( ) AnnotatedPipelineComponents = Annotated[ - PipelineComponents, - Field(discriminator="type"), + PipelineComponents, Field(discriminator="type") ] schema = schema_json_of( @@ -145,9 +141,6 @@ def gen_pipeline_schema( def gen_config_schema() -> None: """Generate a json schema from the model of pipeline config.""" schema = schema_json_of( - PipelineConfig, - title="KPOps config schema", - indent=4, - sort_keys=True, + PipelineConfig, title="KPOps config schema", indent=4, sort_keys=True ) print(schema) diff --git a/kpops/utils/yaml_loading.py b/kpops/utils/yaml_loading.py index b587ae1e4..fb810c193 100644 --- a/kpops/utils/yaml_loading.py +++ b/kpops/utils/yaml_loading.py @@ -9,8 +9,7 @@ def generate_hashkey( - file_path: Path, - substitution: Mapping[str, Any] | None = None, + file_path: Path, substitution: Mapping[str, Any] | None = None ) -> tuple: if substitution is None: substitution = {} @@ -19,9 +18,7 @@ def generate_hashkey( @cached(cache={}, key=generate_hashkey) def load_yaml_file( - file_path: Path, - *, - substitution: Mapping[str, Any] | None = None, + file_path: Path, *, substitution: Mapping[str, Any] | None = None ) -> dict | list[dict]: with file_path.open() as yaml_file: return yaml.load(substitute(yaml_file.read(), substitution), Loader=yaml.Loader) @@ -74,7 +71,5 @@ def substitute_nested(input: str, **kwargs) -> str: old_str, new_str = new_str, substitute(new_str, kwargs) if new_str != old_str: msg = "An infinite loop condition detected. Check substitution variables." - raise ValueError( - msg, - ) + raise ValueError(msg) return old_str diff --git a/tests/cli/resources/module.py b/tests/cli/resources/module.py index 3691e53e1..3956eedf2 100644 --- a/tests/cli/resources/module.py +++ b/tests/cli/resources/module.py @@ -9,8 +9,6 @@ class CustomSchemaProvider(SchemaProvider): def provide_schema( - self, - schema_class: str, - models: dict[ModelName, ModelVersion], + self, schema_class: str, models: dict[ModelName, ModelVersion] ) -> Schema: return AvroSchema() diff --git a/tests/cli/test_pipeline_steps.py b/tests/cli/test_pipeline_steps.py index 8b4c6c6e3..a09d7b064 100644 --- a/tests/cli/test_pipeline_steps.py +++ b/tests/cli/test_pipeline_steps.py @@ -45,9 +45,7 @@ def log_info(mocker: MockerFixture) -> MagicMock: def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): filtered_steps = get_steps_to_apply( - pipeline, - "example2,example3", - FilterType.INCLUDE, + pipeline, "example2,example3", FilterType.INCLUDE ) assert len(filtered_steps) == 2 @@ -56,7 +54,7 @@ def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): assert log_info.call_count == 1 log_info.assert_any_call( - "The following steps are included:\n['example2', 'example3']", + "The following steps are included:\n['example2', 'example3']" ) filtered_steps = get_steps_to_apply(pipeline, None, FilterType.INCLUDE) @@ -68,9 +66,7 @@ def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): def tests_filter_steps_to_exclude(log_info: MagicMock, pipeline: Pipeline): filtered_steps = get_steps_to_apply( - pipeline, - "example2,example3", - FilterType.EXCLUDE, + pipeline, "example2,example3", FilterType.EXCLUDE ) assert len(filtered_steps) == 1 diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 1d61368d1..cbb855d14 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -78,8 +78,7 @@ class SubPipelineComponentCorrectDocstr(SubPipelineComponent): """ example_attr: str = Field( - default=..., - description=describe_attr("example_attr", __doc__), + default=..., description=describe_attr("example_attr", __doc__) ) @@ -87,10 +86,7 @@ class SubPipelineComponentCorrectDocstr(SubPipelineComponent): @pytest.mark.filterwarnings( - "ignore:handlers", - "ignore:config", - "ignore:enrich", - "ignore:validate", + "ignore:handlers", "ignore:config", "ignore:enrich", "ignore:validate" ) class TestGenSchema: def test_gen_pipeline_schema_no_modules(self, caplog: pytest.LogCaptureFixture): @@ -108,7 +104,7 @@ def test_gen_pipeline_schema_no_modules(self, caplog: pytest.LogCaptureFixture): "root", logging.WARNING, "No components are provided, no schema is generated.", - ), + ) ] assert result.exit_code == 0 diff --git a/tests/compiler/test_pipeline_name.py b/tests/compiler/test_pipeline_name.py index 6561197a1..f0a1b1b1e 100644 --- a/tests/compiler/test_pipeline_name.py +++ b/tests/compiler/test_pipeline_name.py @@ -51,8 +51,7 @@ def test_should_set_pipeline_name_with_absolute_base_dir(): def test_should_not_set_pipeline_name_with_the_same_base_dir(): with pytest.raises( - ValueError, - match="The pipeline-base-dir should not equal the pipeline-path", + ValueError, match="The pipeline-base-dir should not equal the pipeline-path" ): Pipeline.set_pipeline_name_env_vars(PIPELINE_PATH, PIPELINE_PATH) diff --git a/tests/component_handlers/helm_wrapper/test_dry_run_handler.py b/tests/component_handlers/helm_wrapper/test_dry_run_handler.py index 0f21a970c..bad4f2aa8 100644 --- a/tests/component_handlers/helm_wrapper/test_dry_run_handler.py +++ b/tests/component_handlers/helm_wrapper/test_dry_run_handler.py @@ -15,13 +15,13 @@ class TestDryRunHandler: @pytest.fixture() def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm", + "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm" ).return_value @pytest.fixture() def helm_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.helm_wrapper.dry_run_handler.HelmDiff", + "kpops.component_handlers.helm_wrapper.dry_run_handler.HelmDiff" ).return_value def test_should_print_helm_diff_when_release_is_new( @@ -42,8 +42,7 @@ def test_should_print_helm_diff_when_release_is_new( dry_run_handler.print_helm_diff("A test stdout", "a-release-name", log) helm_mock.get_manifest.assert_called_once_with( - "a-release-name", - "test-namespace", + "a-release-name", "test-namespace" ) assert "Helm release a-release-name does not exist" in caplog.text mock_load_manifest.assert_called_once_with("A test stdout") @@ -56,7 +55,7 @@ def test_should_print_helm_diff_when_release_exists( caplog: LogCaptureFixture, ): helm_mock.get_manifest.return_value = iter( - [HelmTemplate("path.yaml", {"a": 1})], + [HelmTemplate("path.yaml", {"a": 1})] ) mock_load_manifest = mocker.patch( "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm.load_manifest", @@ -68,8 +67,7 @@ def test_should_print_helm_diff_when_release_exists( dry_run_handler.print_helm_diff("A test stdout", "a-release-name", log) helm_mock.get_manifest.assert_called_once_with( - "a-release-name", - "test-namespace", + "a-release-name", "test-namespace" ) assert "Helm release a-release-name already exists" in caplog.text mock_load_manifest.assert_called_once_with("A test stdout") diff --git a/tests/component_handlers/helm_wrapper/test_helm_diff.py b/tests/component_handlers/helm_wrapper/test_helm_diff.py index fc423cf20..15a58a023 100644 --- a/tests/component_handlers/helm_wrapper/test_helm_diff.py +++ b/tests/component_handlers/helm_wrapper/test_helm_diff.py @@ -24,7 +24,7 @@ def test_diff(): HelmTemplate("a.yaml", {"a": 2}), HelmTemplate("c.yaml", {"c": 1}), ], - ), + ) ) == [ Change( old_value={"a": 1}, @@ -42,7 +42,7 @@ def test_diff(): # test no current release assert list( - helm_diff.calculate_changes((), [HelmTemplate("a.yaml", {"a": 1})]), + helm_diff.calculate_changes((), [HelmTemplate("a.yaml", {"a": 1})]) ) == [ Change( old_value={}, diff --git a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py index e8870de85..ce6fae709 100644 --- a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py +++ b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py @@ -44,9 +44,7 @@ def mock_get_version(self, mocker: MockerFixture) -> MagicMock: return mock_get_version def test_should_call_run_command_method_when_helm_install_with_defaults( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -76,9 +74,7 @@ def test_should_call_run_command_method_when_helm_install_with_defaults( ) def test_should_include_configured_tls_parameters_on_add_when_version_is_old( - self, - run_command: MagicMock, - mocker: MockerFixture, + self, run_command: MagicMock, mocker: MockerFixture ): mock_get_version = mocker.patch.object(Helm, "get_version") mock_get_version.return_value = Version(major=3, minor=6, patch=0) @@ -108,9 +104,7 @@ def test_should_include_configured_tls_parameters_on_add_when_version_is_old( ] def test_should_include_configured_tls_parameters_on_add_when_version_is_new( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm = Helm(HelmConfig()) @@ -138,9 +132,7 @@ def test_should_include_configured_tls_parameters_on_add_when_version_is_new( ] def test_should_include_configured_tls_parameters_on_update( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) helm_wrapper.upgrade_install( @@ -176,9 +168,7 @@ def test_should_include_configured_tls_parameters_on_update( ) def test_should_call_run_command_method_when_helm_install_with_non_defaults( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) helm_wrapper.upgrade_install( @@ -223,9 +213,7 @@ def test_should_call_run_command_method_when_helm_install_with_non_defaults( ) def test_should_call_run_command_method_when_uninstalling_streams_app( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) helm_wrapper.uninstall( @@ -252,13 +240,11 @@ def test_should_log_warning_when_release_not_found( ) log_warning_mock.assert_called_once_with( - "Release with name test-release not found. Could not uninstall app.", + "Release with name test-release not found. Could not uninstall app." ) def test_should_call_run_command_method_when_installing_streams_app__with_dry_run( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -281,7 +267,7 @@ def test_should_call_run_command_method_when_installing_streams_app__with_dry_ru def test_validate_console_output(self): with pytest.raises(RuntimeError): Helm.parse_helm_command_stderr_output( - "A specific\n eRrOr was found in this line", + "A specific\n eRrOr was found in this line" ) with pytest.raises(ReleaseNotFoundException): Helm.parse_helm_command_stderr_output("New \nmessage\n ReLease: noT foUnD") @@ -289,13 +275,13 @@ def test_validate_console_output(self): Helm.parse_helm_command_stderr_output("This is \njust WaRnIng nothing more") except RuntimeError as e: pytest.fail( - f"validate_console_output() raised RuntimeError unexpectedly!\nError message: {e}", + f"validate_console_output() raised RuntimeError unexpectedly!\nError message: {e}" ) try: Helm.parse_helm_command_stderr_output("This is \njust WaRnIng nothing more") except ReleaseNotFoundException: pytest.fail( - f"validate_console_output() raised ReleaseNotFoundException unexpectedly!\nError message: {ReleaseNotFoundException}", + f"validate_console_output() raised ReleaseNotFoundException unexpectedly!\nError message: {ReleaseNotFoundException}" ) def test_helm_template_load(self): @@ -308,7 +294,7 @@ def test_helm_template_load(self): metadata: labels: foo: bar - """, + """ ) helm_template = HelmTemplate.load("test2.yaml", stdout) @@ -331,7 +317,7 @@ def test_load_manifest_with_no_notes(self): --- # Source: chart/templates/test3b.yaml foo: bar - """, + """ ) helm_templates = list(Helm.load_manifest(stdout)) assert len(helm_templates) == 2 @@ -348,7 +334,7 @@ def test_raise_parse_error_when_helm_content_is_invalid(self): """ --- # Resource: chart/templates/test1.yaml - """, + """ ) with pytest.raises(ParseError, match="Not a valid Helm template source"): list(Helm.load_manifest(stdout)) @@ -399,7 +385,7 @@ def test_load_manifest(self): NOTES: test - """, + """ ) helm_templates = list(Helm.load_manifest(stdout)) assert len(helm_templates) == 2 @@ -412,9 +398,7 @@ def test_load_manifest(self): assert helm_templates[1].template == {"foo": "bar"} def test_helm_get_manifest( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) run_command.return_value = dedent( @@ -424,10 +408,10 @@ def test_helm_get_manifest( data: - a: 1 - b: 2 - """, + """ ) helm_templates = list( - helm_wrapper.get_manifest("test-release", "test-namespace"), + helm_wrapper.get_manifest("test-release", "test-namespace") ) run_command.assert_called_once_with( command=[ @@ -447,9 +431,7 @@ def test_helm_get_manifest( assert helm_wrapper.get_manifest("test-release", "test-namespace") == () def test_should_call_run_command_method_when_helm_template_with_optional_args( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -487,9 +469,7 @@ def test_should_call_run_command_method_when_helm_template_with_optional_args( ) def test_should_call_run_command_method_when_helm_template_without_optional_args( - self, - run_command: MagicMock, - mock_get_version: MagicMock, + self, run_command: MagicMock, mock_get_version: MagicMock ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -545,8 +525,7 @@ def test_should_call_helm_version( assert helm._version == expected_version def test_should_raise_exception_if_helm_version_is_old( - self, - run_command: MagicMock, + self, run_command: MagicMock ): run_command.return_value = "v2.9.0+gc9f554d" with pytest.raises( @@ -556,12 +535,10 @@ def test_should_raise_exception_if_helm_version_is_old( Helm(helm_config=HelmConfig()) def test_should_raise_exception_if_helm_version_cannot_be_parsed( - self, - run_command: MagicMock, + self, run_command: MagicMock ): run_command.return_value = "123" with pytest.raises( - RuntimeError, - match="Could not parse the Helm version.\n\nHelm output:\n123", + RuntimeError, match="Could not parse the Helm version.\n\nHelm output:\n123" ): Helm(helm_config=HelmConfig()) diff --git a/tests/component_handlers/helm_wrapper/test_utils.py b/tests/component_handlers/helm_wrapper/test_utils.py index 8f40b0c5d..eef6ca14f 100644 --- a/tests/component_handlers/helm_wrapper/test_utils.py +++ b/tests/component_handlers/helm_wrapper/test_utils.py @@ -12,7 +12,7 @@ def test_trim_release_name_with_suffix(): def test_trim_release_name_without_suffix(): name = trim_release_name( - "example-component-name-too-long-fake-fakefakefakefakefake", + "example-component-name-too-long-fake-fakefakefakefakefake" ) assert name == "example-component-name-too-long-fake-fakefakefakefak" assert len(name) == 52 diff --git a/tests/component_handlers/kafka_connect/test_connect_handler.py b/tests/component_handlers/kafka_connect/test_connect_handler.py index fe6bc473e..db64690e9 100644 --- a/tests/component_handlers/kafka_connect/test_connect_handler.py +++ b/tests/component_handlers/kafka_connect/test_connect_handler.py @@ -25,25 +25,25 @@ class TestConnectorHandler: @pytest.fixture() def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.info", + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.info" ) @pytest.fixture() def log_warning_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.warning", + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.warning" ) @pytest.fixture() def log_error_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.error", + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.error" ) @pytest.fixture() def renderer_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.render_diff", + "kpops.component_handlers.kafka_connect.kafka_connect_handler.render_diff" ) @staticmethod @@ -59,7 +59,7 @@ def connector_config(self) -> KafkaConnectorConfig: **{ "connector.class": "com.bakdata.connect.TestConnector", "name": CONNECTOR_NAME, - }, + } ) def test_should_create_connector_in_dry_run( @@ -75,15 +75,15 @@ def test_should_create_connector_in_dry_run( handler.create_connector(connector_config, dry_run=True) connector_wrapper.get_connector.assert_called_once_with(CONNECTOR_NAME) connector_wrapper.validate_connector_config.assert_called_once_with( - connector_config, + connector_config ) assert log_info_mock.mock_calls == [ mock.call.log_info( - f"Connector Creation: connector {CONNECTOR_NAME} already exists.", + f"Connector Creation: connector {CONNECTOR_NAME} already exists." ), mock.call.log_info( - f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!", + f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!" ), ] @@ -109,10 +109,10 @@ def test_should_log_correct_message_when_create_connector_and_connector_not_exis assert log_info_mock.mock_calls == [ mock.call( - f"Connector Creation: connector {CONNECTOR_NAME} does not exist. Creating connector with config:\n\x1b[32m+ connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n\x1b[0m\x1b[32m+ name: {CONNECTOR_NAME}\n\x1b[0m\x1b[32m+ tasks.max: '1'\n\x1b[0m\x1b[32m+ topics: {TOPIC_NAME}\n\x1b[0m", + f"Connector Creation: connector {CONNECTOR_NAME} does not exist. Creating connector with config:\n\x1b[32m+ connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n\x1b[0m\x1b[32m+ name: {CONNECTOR_NAME}\n\x1b[0m\x1b[32m+ tasks.max: '1'\n\x1b[0m\x1b[32m+ topics: {TOPIC_NAME}\n\x1b[0m" ), mock.call( - f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!", + f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!" ), ] @@ -134,7 +134,7 @@ def test_should_log_correct_message_when_create_connector_and_connector_exists_i "tasks": [], } connector_wrapper.get_connector.return_value = KafkaConnectResponse( - **actual_response, + **actual_response ) configs = { @@ -147,25 +147,23 @@ def test_should_log_correct_message_when_create_connector_and_connector_exists_i handler.create_connector(connector_config, dry_run=True) connector_wrapper.get_connector.assert_called_once_with(CONNECTOR_NAME) connector_wrapper.validate_connector_config.assert_called_once_with( - connector_config, + connector_config ) assert log_info_mock.mock_calls == [ mock.call( - f"Connector Creation: connector {CONNECTOR_NAME} already exists.", + f"Connector Creation: connector {CONNECTOR_NAME} already exists." ), mock.call( - f"Updating config:\n connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n name: {CONNECTOR_NAME}\n\x1b[31m- tasks.max: '1'\n\x1b[0m\x1b[33m? ^\n\x1b[0m\x1b[32m+ tasks.max: '2'\n\x1b[0m\x1b[33m? ^\n\x1b[0m topics: {TOPIC_NAME}\n", + f"Updating config:\n connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n name: {CONNECTOR_NAME}\n\x1b[31m- tasks.max: '1'\n\x1b[0m\x1b[33m? ^\n\x1b[0m\x1b[32m+ tasks.max: '2'\n\x1b[0m\x1b[33m? ^\n\x1b[0m topics: {TOPIC_NAME}\n" ), mock.call( - f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!", + f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!" ), ] def test_should_log_invalid_config_when_create_connector_in_dry_run( - self, - connector_config: KafkaConnectorConfig, - renderer_diff_mock: MagicMock, + self, connector_config: KafkaConnectorConfig, renderer_diff_mock: MagicMock ): connector_wrapper = MagicMock() @@ -186,12 +184,11 @@ def test_should_log_invalid_config_when_create_connector_in_dry_run( handler.create_connector(connector_config, dry_run=True) connector_wrapper.validate_connector_config.assert_called_once_with( - connector_config, + connector_config ) def test_should_call_update_connector_config_when_connector_exists_not_dry_run( - self, - connector_config: KafkaConnectorConfig, + self, connector_config: KafkaConnectorConfig ): connector_wrapper = MagicMock() handler = self.connector_handler(connector_wrapper) @@ -204,8 +201,7 @@ def test_should_call_update_connector_config_when_connector_exists_not_dry_run( ] def test_should_call_create_connector_when_connector_does_not_exists_not_dry_run( - self, - connector_config: KafkaConnectorConfig, + self, connector_config: KafkaConnectorConfig ): connector_wrapper = MagicMock() @@ -228,8 +224,8 @@ def test_should_print_correct_log_when_destroying_connector_in_dry_run( log_info_mock.assert_called_once_with( magentaify( - f"Connector Destruction: connector {CONNECTOR_NAME} already exists. Deleting connector.", - ), + f"Connector Destruction: connector {CONNECTOR_NAME} already exists. Deleting connector." + ) ) def test_should_print_correct_warning_log_when_destroying_connector_and_connector_exists_in_dry_run( @@ -244,7 +240,7 @@ def test_should_print_correct_warning_log_when_destroying_connector_and_connecto handler.destroy_connector(CONNECTOR_NAME, dry_run=True) log_warning_mock.assert_called_once_with( - f"Connector Destruction: connector {CONNECTOR_NAME} does not exist and cannot be deleted. Skipping.", + f"Connector Destruction: connector {CONNECTOR_NAME} does not exist and cannot be deleted. Skipping." ) def test_should_call_delete_connector_when_destroying_existing_connector_not_dry_run( @@ -271,5 +267,5 @@ def test_should_print_correct_warning_log_when_destroying_connector_and_connecto handler.destroy_connector(CONNECTOR_NAME, dry_run=False) log_warning_mock.assert_called_once_with( - f"Connector Destruction: the connector {CONNECTOR_NAME} does not exist. Skipping.", + f"Connector Destruction: the connector {CONNECTOR_NAME} does not exist. Skipping." ) diff --git a/tests/component_handlers/kafka_connect/test_connect_wrapper.py b/tests/component_handlers/kafka_connect/test_connect_wrapper.py index 1b1793109..8e60d92a7 100644 --- a/tests/component_handlers/kafka_connect/test_connect_wrapper.py +++ b/tests/component_handlers/kafka_connect/test_connect_wrapper.py @@ -40,7 +40,7 @@ def connector_config(self) -> KafkaConnectorConfig: **{ "connector.class": "com.bakdata.connect.TestConnector", "name": "test-connector", - }, + } ) def test_should_through_exception_when_host_is_not_set(self): @@ -58,8 +58,7 @@ def test_should_through_exception_when_host_is_not_set(self): @patch("httpx.post") def test_should_create_post_requests_for_given_connector_configuration( - self, - mock_post: MagicMock, + self, mock_post: MagicMock ): configs = { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", @@ -85,9 +84,7 @@ def test_should_create_post_requests_for_given_connector_configuration( ) def test_should_return_correct_response_when_connector_created( - self, - httpx_mock: HTTPXMock, - connector_config: KafkaConnectorConfig, + self, httpx_mock: HTTPXMock, connector_config: KafkaConnectorConfig ): actual_response = { "name": "hdfs-sink-connector", @@ -138,7 +135,7 @@ def test_should_raise_connector_exists_exception_when_connector_exists( ) log_warning.assert_called_with( - "Rebalancing in progress while creating a connector... Retrying...", + "Rebalancing in progress while creating a connector... Retrying..." ) @patch("httpx.get") @@ -155,9 +152,7 @@ def test_should_create_correct_get_connector_request(self, mock_get: MagicMock): @pytest.mark.flaky(reruns=5, condition=sys.platform.startswith("win32")) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_return_correct_response_when_getting_connector( - self, - log_info: MagicMock, - httpx_mock: HTTPXMock, + self, log_info: MagicMock, httpx_mock: HTTPXMock ): connector_name = "test-connector" @@ -192,9 +187,7 @@ def test_should_return_correct_response_when_getting_connector( @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_raise_connector_not_found_when_getting_connector( - self, - log_info: MagicMock, - httpx_mock: HTTPXMock, + self, log_info: MagicMock, httpx_mock: HTTPXMock ): connector_name = "test-connector" @@ -209,14 +202,12 @@ def test_should_raise_connector_not_found_when_getting_connector( self.connect_wrapper.get_connector(connector_name) log_info.assert_called_once_with( - f"The named connector {connector_name} does not exists.", + f"The named connector {connector_name} does not exists." ) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_rebalance_in_progress_when_getting_connector( - self, - log_warning: MagicMock, - httpx_mock: HTTPXMock, + self, log_warning: MagicMock, httpx_mock: HTTPXMock ): connector_name = "test-connector" @@ -234,7 +225,7 @@ def test_should_raise_rebalance_in_progress_when_getting_connector( ) log_warning.assert_called_with( - "Rebalancing in progress while getting a connector... Retrying...", + "Rebalancing in progress while getting a connector... Retrying..." ) @patch("httpx.put") @@ -252,7 +243,7 @@ def test_should_create_correct_update_connector_request(self, mock_put: MagicMoc } with pytest.raises(KafkaConnectError): self.connect_wrapper.update_connector_config( - KafkaConnectorConfig(**configs), + KafkaConnectorConfig(**configs) ) mock_put.assert_called_with( @@ -296,11 +287,11 @@ def test_should_return_correct_response_when_update_connector( status_code=200, ) expected_response = self.connect_wrapper.update_connector_config( - connector_config, + connector_config ) assert KafkaConnectResponse(**actual_response) == expected_response log_info.assert_called_once_with( - f"Config for connector {connector_name} updated.", + f"Config for connector {connector_name} updated." ) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") @@ -338,7 +329,7 @@ def test_should_return_correct_response_when_update_connector_created( status_code=201, ) expected_response = self.connect_wrapper.update_connector_config( - connector_config, + connector_config ) assert KafkaConnectResponse(**actual_response) == expected_response log_info.assert_called_once_with(f"Connector {connector_name} created.") @@ -366,13 +357,12 @@ def test_should_raise_connector_exists_exception_when_update_connector( ) log_warning.assert_called_with( - "Rebalancing in progress while updating a connector... Retrying...", + "Rebalancing in progress while updating a connector... Retrying..." ) @patch("httpx.delete") def test_should_create_correct_delete_connector_request( - self, - mock_delete: MagicMock, + self, mock_delete: MagicMock ): connector_name = "test-connector" with pytest.raises(KafkaConnectError): @@ -385,9 +375,7 @@ def test_should_create_correct_delete_connector_request( @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_return_correct_response_when_deleting_connector( - self, - log_info: MagicMock, - httpx_mock: HTTPXMock, + self, log_info: MagicMock, httpx_mock: HTTPXMock ): connector_name = "test-connector" @@ -422,9 +410,7 @@ def test_should_return_correct_response_when_deleting_connector( @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_raise_connector_not_found_when_deleting_connector( - self, - log_info: MagicMock, - httpx_mock: HTTPXMock, + self, log_info: MagicMock, httpx_mock: HTTPXMock ): connector_name = "test-connector" @@ -439,14 +425,12 @@ def test_should_raise_connector_not_found_when_deleting_connector( self.connect_wrapper.delete_connector(connector_name) log_info.assert_called_once_with( - f"The named connector {connector_name} does not exists.", + f"The named connector {connector_name} does not exists." ) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_rebalance_in_progress_when_deleting_connector( - self, - log_warning: MagicMock, - httpx_mock: HTTPXMock, + self, log_warning: MagicMock, httpx_mock: HTTPXMock ): connector_name = "test-connector" @@ -464,13 +448,12 @@ def test_should_raise_rebalance_in_progress_when_deleting_connector( ) log_warning.assert_called_with( - "Rebalancing in progress while deleting a connector... Retrying...", + "Rebalancing in progress while deleting a connector... Retrying..." ) @patch("httpx.put") def test_should_create_correct_validate_connector_config_request( - self, - mock_put: MagicMock, + self, mock_put: MagicMock ): connector_config = KafkaConnectorConfig( **{ @@ -478,7 +461,7 @@ def test_should_create_correct_validate_connector_config_request( "name": "FileStreamSinkConnector", "tasks.max": "1", "topics": "test-topic", - }, + } ) with pytest.raises(KafkaConnectError): self.connect_wrapper.validate_connector_config(connector_config) @@ -491,8 +474,7 @@ def test_should_create_correct_validate_connector_config_request( @patch("httpx.put") def test_should_create_correct_validate_connector_config_and_name_gets_added( - self, - mock_put: MagicMock, + self, mock_put: MagicMock ): connector_name = "FileStreamSinkConnector" configs = { @@ -503,7 +485,7 @@ def test_should_create_correct_validate_connector_config_and_name_gets_added( } with pytest.raises(KafkaConnectError): self.connect_wrapper.validate_connector_config( - KafkaConnectorConfig(**configs), + KafkaConnectorConfig(**configs) ) mock_put.assert_called_with( @@ -532,9 +514,9 @@ def test_should_parse_validate_connector_config(self, httpx_mock: HTTPXMock): "topics": "test-topic", } errors = self.connect_wrapper.validate_connector_config( - KafkaConnectorConfig(**configs), + KafkaConnectorConfig(**configs) ) assert errors == [ - "Found error for field file: Missing required configuration 'file' which has no default value.", + "Found error for field file: Missing required configuration 'file' which has no default value." ] diff --git a/tests/component_handlers/schema_handler/resources/module.py b/tests/component_handlers/schema_handler/resources/module.py index 7be7b4fca..8c7168efa 100644 --- a/tests/component_handlers/schema_handler/resources/module.py +++ b/tests/component_handlers/schema_handler/resources/module.py @@ -9,8 +9,6 @@ class CustomSchemaProvider(SchemaProvider): def provide_schema( - self, - schema_class: str, - models: dict[ModelName, ModelVersion], + self, schema_class: str, models: dict[ModelName, ModelVersion] ) -> Schema: return AvroSchema({}) diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index df516de19..8f5b0f29e 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -28,28 +28,28 @@ @pytest.fixture(autouse=True) def log_info_mock(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.log.info", + "kpops.component_handlers.schema_handler.schema_handler.log.info" ) @pytest.fixture(autouse=True) def log_debug_mock(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.log.debug", + "kpops.component_handlers.schema_handler.schema_handler.log.debug" ) @pytest.fixture(autouse=False) def find_class_mock(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.find_class", + "kpops.component_handlers.schema_handler.schema_handler.find_class" ) @pytest.fixture(autouse=True) def schema_registry_mock(mocker: MockerFixture) -> MagicMock: schema_registry_mock = mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.SchemaRegistryClient", + "kpops.component_handlers.schema_handler.schema_handler.SchemaRegistryClient" ) return schema_registry_mock.return_value @@ -96,19 +96,16 @@ def test_should_lazy_load_schema_provider(find_class_mock: MagicMock): schema_registry_url="http://localhost:8081", ) schema_handler = SchemaHandler.load_schema_handler( - TEST_SCHEMA_PROVIDER_MODULE, - config_enable, + TEST_SCHEMA_PROVIDER_MODULE, config_enable ) assert schema_handler is not None schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", - {}, + "com.bakdata.kpops.test.SchemaHandlerTest", {} ) schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SomeOtherSchemaClass", - {}, + "com.bakdata.kpops.test.SomeOtherSchemaClass", {} ) find_class_mock.assert_called_once_with(TEST_SCHEMA_PROVIDER_MODULE, SchemaProvider) @@ -116,8 +113,7 @@ def test_should_lazy_load_schema_provider(find_class_mock: MagicMock): def test_should_raise_value_error_if_schema_provider_class_not_found(): schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=NON_EXISTING_PROVIDER_MODULE, + url="http://mock:8081", components_module=NON_EXISTING_PROVIDER_MODULE ) with pytest.raises( @@ -127,8 +123,7 @@ def test_should_raise_value_error_if_schema_provider_class_not_found(): f"{SchemaProvider.__module__}.{SchemaProvider.__name__}.", ): schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", - {}, + "com.bakdata.kpops.test.SchemaHandlerTest", {} ) @@ -160,19 +155,15 @@ def test_should_raise_value_error_when_schema_provider_is_called_and_components_ match="The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your SchemaProvider implementation exists.", ): schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", - {}, + "com.bakdata.kpops.test.SchemaHandlerTest", {} ) def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( - to_section: ToSection, - log_info_mock: MagicMock, - schema_registry_mock: MagicMock, + to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock ): schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_registry_mock.get_versions.return_value = [] @@ -180,7 +171,7 @@ def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( schema_handler.submit_schemas(to_section, True) log_info_mock.assert_called_once_with( - greenify("Schema Submission: The subject topic-X-value will be submitted."), + greenify("Schema Submission: The subject topic-X-value will be submitted.") ) schema_registry_mock.register.assert_not_called() @@ -192,8 +183,7 @@ def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_registry_mock.get_versions.return_value = [1, 2, 3] @@ -203,7 +193,7 @@ def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( schema_handler.submit_schemas(to_section, True) log_info_mock.assert_called_once_with( - f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}.", + f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}." ) schema_registry_mock.register.assert_not_called() @@ -215,8 +205,7 @@ def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatibl ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" @@ -255,8 +244,7 @@ def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_ve ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" schema = schema_provider.provide_schema(schema_class, {}) @@ -269,13 +257,13 @@ def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_ve assert log_info_mock.mock_calls == [ mock.call( - f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}.", + f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}." ), ] assert log_debug_mock.mock_calls == [ mock.call( - f"Schema Submission: schema was already submitted for the subject topic-X-value as version {registered_version.schema}. Therefore, the specified schema must be compatible.", + f"Schema Submission: schema was already submitted for the subject topic-X-value as version {registered_version.schema}. Therefore, the specified schema must be compatible." ), ] @@ -292,8 +280,7 @@ def test_should_submit_non_existing_schema_when_not_dry( schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" schema = schema_provider.provide_schema(schema_class, {}) schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_registry_mock.get_versions.return_value = [] @@ -302,13 +289,12 @@ def test_should_submit_non_existing_schema_when_not_dry( subject = "topic-X-value" log_info_mock.assert_called_once_with( - f"Schema Submission: schema submitted for {subject} with model {topic_config.value_schema}.", + f"Schema Submission: schema submitted for {subject} with model {topic_config.value_schema}." ) schema_registry_mock.get_versions.assert_not_called() schema_registry_mock.register.assert_called_once_with( - subject=subject, - schema=schema, + subject=subject, schema=schema ) @@ -318,8 +304,7 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_registry_mock.get_versions.return_value = [] @@ -327,19 +312,17 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( schema_handler.delete_schemas(to_section, True) log_info_mock.assert_called_once_with( - magentaify("Schema Deletion: will delete subject topic-X-value."), + magentaify("Schema Deletion: will delete subject topic-X-value.") ) schema_registry_mock.delete_subject.assert_not_called() def test_should_delete_schemas_when_not_in_dry_run( - to_section: ToSection, - schema_registry_mock: MagicMock, + to_section: ToSection, schema_registry_mock: MagicMock ): schema_handler = SchemaHandler( - url="http://mock:8081", - components_module=TEST_SCHEMA_PROVIDER_MODULE, + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_registry_mock.get_versions.return_value = [] diff --git a/tests/component_handlers/topic/test_proxy_wrapper.py b/tests/component_handlers/topic/test_proxy_wrapper.py index f0e121dd7..e26fb0e5a 100644 --- a/tests/component_handlers/topic/test_proxy_wrapper.py +++ b/tests/component_handlers/topic/test_proxy_wrapper.py @@ -32,9 +32,7 @@ def log_debug_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture(autouse=True) def _setup(self, httpx_mock: HTTPXMock): config = PipelineConfig( - defaults_path=DEFAULTS_PATH, - environment="development", - kafka_rest_host=HOST, + defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST ) self.proxy_wrapper = ProxyWrapper(pipeline_config=config) @@ -63,8 +61,7 @@ def test_should_raise_exception_when_host_is_not_set(self): @patch("httpx.post") def test_should_create_topic_with_all_topic_configuration( - self, - mock_post: MagicMock, + self, mock_post: MagicMock ): topic_spec = { "topic_name": "topic-X", @@ -130,7 +127,7 @@ def test_should_call_batch_alter_topic_config(self, mock_put: MagicMock): "data": [ {"name": "cleanup.policy", "operation": "DELETE"}, {"name": "compression.type", "value": "gzip"}, - ], + ] }, ) @@ -157,9 +154,7 @@ def test_should_call_get_broker_config(self, mock_get: MagicMock): ) def test_should_log_topic_creation( - self, - log_info_mock: MagicMock, - httpx_mock: HTTPXMock, + self, log_info_mock: MagicMock, httpx_mock: HTTPXMock ): topic_spec = { "topic_name": "topic-X", @@ -182,9 +177,7 @@ def test_should_log_topic_creation( log_info_mock.assert_called_once_with("Topic topic-X created.") def test_should_log_topic_deletion( - self, - log_info_mock: MagicMock, - httpx_mock: HTTPXMock, + self, log_info_mock: MagicMock, httpx_mock: HTTPXMock ): topic_name = "topic-X" @@ -231,9 +224,7 @@ def test_should_get_topic(self, log_debug_mock: MagicMock, httpx_mock: HTTPXMock assert get_topic_response == topic_response def test_should_rais_topic_not_found_exception_get_topic( - self, - log_debug_mock: MagicMock, - httpx_mock: HTTPXMock, + self, log_debug_mock: MagicMock, httpx_mock: HTTPXMock ): topic_name = "topic-X" @@ -252,9 +243,7 @@ def test_should_rais_topic_not_found_exception_get_topic( log_debug_mock.assert_any_call("Topic topic-X not found.") def test_should_log_reset_default_topic_config_when_deleted( - self, - log_info_mock: MagicMock, - httpx_mock: HTTPXMock, + self, log_info_mock: MagicMock, httpx_mock: HTTPXMock ): topic_name = "topic-X" config_name = "cleanup.policy" @@ -273,5 +262,5 @@ def test_should_log_reset_default_topic_config_when_deleted( ) log_info_mock.assert_called_once_with( - f"Config of topic {topic_name} was altered.", + f"Config of topic {topic_name} was altered." ) diff --git a/tests/component_handlers/topic/test_topic_handler.py b/tests/component_handlers/topic/test_topic_handler.py index aeb04f6c0..6b1b017fc 100644 --- a/tests/component_handlers/topic/test_topic_handler.py +++ b/tests/component_handlers/topic/test_topic_handler.py @@ -70,7 +70,7 @@ def get_topic_response_mock(self) -> MagicMock: wrapper.get_topic.return_value = TopicResponse(**response) wrapper.get_broker_config.return_value = BrokerConfigResponse(**broker_response) wrapper.get_topic_config.return_value = TopicConfigResponse( - **response_topic_config, + **response_topic_config ) return wrapper @@ -121,8 +121,7 @@ def test_should_call_create_topic_with_dry_run_false(self): wrapper.__dry_run_topic_creation.assert_not_called() def test_should_call_update_topic_config_when_topic_exists_and_with_dry_run_false( - self, - get_topic_response_mock: MagicMock, + self, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock topic_handler = TopicHandler(proxy_wrapper=wrapper) @@ -148,9 +147,7 @@ def test_should_call_update_topic_config_when_topic_exists_and_with_dry_run_fals wrapper.__dry_run_topic_creation.assert_not_called() def test_should_update_topic_config_when_one_config_changed( - self, - log_info_mock: MagicMock, - get_topic_response_mock: MagicMock, + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock @@ -172,9 +169,7 @@ def test_should_update_topic_config_when_one_config_changed( ) def test_should_not_update_topic_config_when_config_not_changed( - self, - log_info_mock: MagicMock, - get_topic_response_mock: MagicMock, + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock @@ -192,13 +187,11 @@ def test_should_not_update_topic_config_when_config_not_changed( wrapper.batch_alter_topic_config.assert_not_called() log_info_mock.assert_called_once_with( - "Topic Creation: config of topic topic-X didn't change. Skipping update.", + "Topic Creation: config of topic topic-X didn't change. Skipping update." ) def test_should_not_update_topic_config_when_config_not_changed_and_not_ordered( - self, - log_info_mock: MagicMock, - get_topic_response_mock: MagicMock, + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock topic_handler = TopicHandler(proxy_wrapper=wrapper) @@ -215,12 +208,11 @@ def test_should_not_update_topic_config_when_config_not_changed_and_not_ordered( wrapper.batch_alter_topic_config.assert_not_called() log_info_mock.assert_called_once_with( - "Topic Creation: config of topic topic-X didn't change. Skipping update.", + "Topic Creation: config of topic topic-X didn't change. Skipping update." ) def test_should_call_reset_topic_config_when_topic_exists_dry_run_false_and_topic_configs_change( - self, - get_topic_response_mock: MagicMock, + self, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock @@ -260,8 +252,7 @@ def test_should_not_call_create_topics_with_dry_run_true_and_topic_not_exists(se wrapper.create_topic.assert_not_called() def test_should_print_message_with_dry_run_true_and_topic_not_exists( - self, - log_info_mock: MagicMock, + self, log_info_mock: MagicMock ): wrapper = MagicMock() wrapper.get_topic.side_effect = TopicNotFoundException() @@ -281,8 +272,8 @@ def test_should_print_message_with_dry_run_true_and_topic_not_exists( log_info_mock.assert_called_once_with( greenify( - "Topic Creation: topic-X does not exist in the cluster. Creating topic.", - ), + "Topic Creation: topic-X does not exist in the cluster. Creating topic." + ) ) def test_should_print_message_if_dry_run_and_topic_exists_with_same_partition_count_and_replication_factor( @@ -305,19 +296,19 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_same_partition_co topic_handler.create_topics(to_section=to_section, dry_run=True) wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff assert log_info_mock.mock_calls == [ - mock.call("Topic Creation: topic-X already exists in cluster."), + mock.call("Topic Creation: topic-X already exists in cluster.") ] assert log_debug_mock.mock_calls == [ mock.call("HTTP/1.1 400 Bad Request"), mock.call({"Content-Type": "application/json"}), mock.call( - {"error_code": 40002, "message": "Topic 'topic-X' already exists."}, + {"error_code": 40002, "message": "Topic 'topic-X' already exists."} ), mock.call( - "Topic Creation: partition count of topic topic-X did not change. Current partitions count 10. Updating configs.", + "Topic Creation: partition count of topic topic-X did not change. Current partitions count 10. Updating configs." ), mock.call( - "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 3. Updating configs.", + "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 3. Updating configs." ), ] @@ -341,7 +332,7 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_default_partition assert log_info_mock.mock_calls == [ mock.call("Config changes for topic topic-X:"), mock.call( - "\n\x1b[32m+ cleanup.policy: compact\n\x1b[0m\x1b[32m+ compression.type: gzip\n\x1b[0m", + "\n\x1b[32m+ cleanup.policy: compact\n\x1b[0m\x1b[32m+ compression.type: gzip\n\x1b[0m" ), mock.call("Topic Creation: topic-X already exists in cluster."), ] @@ -349,19 +340,18 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_default_partition mock.call("HTTP/1.1 400 Bad Request"), mock.call({"Content-Type": "application/json"}), mock.call( - {"error_code": 40002, "message": "Topic 'topic-X' already exists."}, + {"error_code": 40002, "message": "Topic 'topic-X' already exists."} ), mock.call( - "Topic Creation: partition count of topic topic-X did not change. Current partitions count 1. Updating configs.", + "Topic Creation: partition count of topic topic-X did not change. Current partitions count 1. Updating configs." ), mock.call( - "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 1. Updating configs.", + "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 1. Updating configs." ), ] def test_should_exit_if_dry_run_and_topic_exists_different_partition_count( - self, - get_topic_response_mock: MagicMock, + self, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock @@ -383,8 +373,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_partition_count( wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( - self, - get_topic_response_mock: MagicMock, + self, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock @@ -406,9 +395,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff def test_should_log_correct_message_when_delete_existing_topic_dry_run( - self, - log_info_mock: MagicMock, - get_topic_response_mock: MagicMock, + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock ): wrapper = get_topic_response_mock @@ -427,13 +414,12 @@ def test_should_log_correct_message_when_delete_existing_topic_dry_run( wrapper.get_topic.assert_called_once_with(topic_name="topic-X") log_info_mock.assert_called_once_with( magentaify( - "Topic Deletion: topic topic-X exists in the cluster. Deleting topic.", - ), + "Topic Deletion: topic topic-X exists in the cluster. Deleting topic." + ) ) def test_should_log_correct_message_when_delete_non_existing_topic_dry_run( - self, - log_warning_mock: MagicMock, + self, log_warning_mock: MagicMock ): wrapper = MagicMock() wrapper.get_topic.side_effect = TopicNotFoundException @@ -452,7 +438,7 @@ def test_should_log_correct_message_when_delete_non_existing_topic_dry_run( wrapper.get_topic.assert_called_once_with(topic_name="topic-X") log_warning_mock.assert_called_once_with( - "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping.", + "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping." ) def test_should_call_delete_topic_not_dry_run(self): @@ -475,8 +461,7 @@ def test_should_call_delete_topic_not_dry_run(self): ] def test_should_print_correct_warning_when_deleting_topic_that_does_not_exists_not_dry_run( - self, - log_warning_mock: MagicMock, + self, log_warning_mock: MagicMock ): wrapper = MagicMock() topic_handler = TopicHandler(proxy_wrapper=wrapper) @@ -494,5 +479,5 @@ def test_should_print_correct_warning_when_deleting_topic_that_does_not_exists_n wrapper.get_topic.assert_called_once_with(topic_name="topic-X") log_warning_mock.assert_called_once_with( - "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping.", + "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping." ) diff --git a/tests/component_handlers/topic/test_utils.py b/tests/component_handlers/topic/test_utils.py index 0d3bd1170..b5f0133ca 100644 --- a/tests/component_handlers/topic/test_utils.py +++ b/tests/component_handlers/topic/test_utils.py @@ -86,7 +86,7 @@ "name": "log.flush.interval.messages", "source": "DEFAULT_CONFIG", "value": "9223372036854775807", - }, + } ], "topic_name": "fake", "value": "9223372036854775807", @@ -108,7 +108,7 @@ "name": "flush.ms", "source": "DEFAULT_CONFIG", "value": "9223372036854775807", - }, + } ], "topic_name": "fake", "value": "9223372036854775807", @@ -247,7 +247,7 @@ def test_get_effective_config(): ], }, ], - }, + } ) effective_config = get_effective_config( diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 176303851..d066d431b 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -77,9 +77,7 @@ class TestBaseDefaultsComponent: ], ) def test_load_defaults( - self, - component_class: type[BaseDefaultsComponent], - defaults: dict, + self, component_class: type[BaseDefaultsComponent], defaults: dict ): assert ( load_defaults(component_class, DEFAULTS_PATH / "defaults.yaml") == defaults @@ -107,9 +105,7 @@ def test_load_defaults( ], ) def test_load_defaults_with_environment( - self, - component_class: type[BaseDefaultsComponent], - defaults: dict, + self, component_class: type[BaseDefaultsComponent], defaults: dict ): assert ( load_defaults( @@ -121,9 +117,7 @@ def test_load_defaults_with_environment( ) def test_inherit_defaults( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ): component = Child(config=config, handlers=handlers) @@ -131,7 +125,7 @@ def test_inherit_defaults( component.name == "fake-child-name" ), "Child default should overwrite parent default" assert component.nice == { - "fake-value": "fake", + "fake-value": "fake" }, "Field introduce by child should be added" assert ( component.value == 2.0 @@ -154,7 +148,7 @@ def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): component.name == "name-defined-in-pipeline_generator" ), "Kwargs should should overwrite all other values" assert component.nice == { - "fake-value": "fake", + "fake-value": "fake" }, "Field introduce by child should be added" assert ( component.value == 2.0 @@ -167,9 +161,7 @@ def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): ), "Defaults in code should be kept for parents" def test_multiple_generations( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ): component = GrandChild(config=config, handlers=handlers) @@ -177,7 +169,7 @@ def test_multiple_generations( component.name == "fake-child-name" ), "Child default should overwrite parent default" assert component.nice == { - "fake-value": "fake", + "fake-value": "fake" }, "Field introduce by child should be added" assert ( component.value == 2.0 @@ -191,13 +183,11 @@ def test_multiple_generations( assert component.grand_child == "grand-child-value" def test_env_var_substitution( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ): ENV["pipeline_name"] = str(DEFAULTS_PATH) component = EnvVarTest(config=config, handlers=handlers) assert component.name == str( - DEFAULTS_PATH, + DEFAULTS_PATH ), "Environment variables should be substituted" diff --git a/tests/components/test_kafka_app.py b/tests/components/test_kafka_app.py index d39d2f6bc..8fd0d98ec 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_kafka_app.py @@ -80,8 +80,7 @@ def test_should_deploy_kafka_app( ) helm_upgrade_install = mocker.patch.object(kafka_app.helm, "upgrade_install") print_helm_diff = mocker.patch.object( - kafka_app.dry_run_handler, - "print_helm_diff", + kafka_app.dry_run_handler, "print_helm_diff" ) mocker.patch.object( KafkaApp, diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index ce831d0d4..2adf867da 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -42,13 +42,13 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture(autouse=True) def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kafka_connector.Helm", + "kpops.components.base_components.kafka_connector.Helm" ).return_value @pytest.fixture() def dry_run_handler(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kafka_connector.DryRunHandler", + "kpops.components.base_components.kafka_connector.DryRunHandler" ).return_value @pytest.fixture() @@ -57,7 +57,7 @@ def connector_config(self) -> KafkaConnectorConfig: **{ "connector.class": CONNECTOR_CLASS, "name": CONNECTOR_FULL_NAME, - }, + } ) def test_connector_config_name_override( @@ -85,8 +85,7 @@ def test_connector_config_name_override( assert connector.app.name == CONNECTOR_FULL_NAME with pytest.raises( - ValueError, - match="Connector name should be the same as component name", + ValueError, match="Connector name should be the same as component name" ): KafkaConnector( name=CONNECTOR_NAME, @@ -97,8 +96,7 @@ def test_connector_config_name_override( ) with pytest.raises( - ValueError, - match="Connector name should be the same as component name", + ValueError, match="Connector name should be the same as component name" ): KafkaConnector( name=CONNECTOR_NAME, diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 30f02f6a4..e8ed7aa22 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -55,10 +55,9 @@ def connector( to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), - }, + } ), ) @@ -74,7 +73,7 @@ def test_connector_config_parsing( config=config, handlers=handlers, app=KafkaConnectorConfig( - **{**connector_config.dict(), "topics": topic_name}, + **{**connector_config.dict(), "topics": topic_name} ), namespace="test-namespace", ) @@ -86,7 +85,7 @@ def test_connector_config_parsing( config=config, handlers=handlers, app=KafkaConnectorConfig( - **{**connector_config.dict(), "topics.regex": topic_pattern}, + **{**connector_config.dict(), "topics.regex": topic_pattern} ), namespace="test-namespace", ) @@ -110,7 +109,7 @@ def test_from_section_parsing_input_topic( topics={ topic1: FromTopic(type=InputTopicTypes.INPUT), topic2: FromTopic(type=InputTopicTypes.INPUT), - }, + } ), ) assert getattr(connector.app, "topics") == f"{topic1},{topic2}" @@ -133,7 +132,7 @@ def test_from_section_parsing_input_pattern( app=connector_config, namespace="test-namespace", from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type - topics={topic_pattern: FromTopic(type=InputTopicTypes.PATTERN)}, + topics={topic_pattern: FromTopic(type=InputTopicTypes.PATTERN)} ), ) assert getattr(connector.app, "topics.regex") == topic_pattern @@ -144,12 +143,10 @@ def test_deploy_order( mocker: MockerFixture, ): mock_create_topics = mocker.patch.object( - connector.handlers.topic_handler, - "create_topics", + connector.handlers.topic_handler, "create_topics" ) mock_create_connector = mocker.patch.object( - connector.handlers.connector_handler, - "create_connector", + connector.handlers.connector_handler, "create_connector" ) mock = mocker.MagicMock() @@ -167,15 +164,13 @@ def test_destroy( mocker: MockerFixture, ): mock_destroy_connector = mocker.patch.object( - connector.handlers.connector_handler, - "destroy_connector", + connector.handlers.connector_handler, "destroy_connector" ) connector.destroy(dry_run=True) mock_destroy_connector.assert_called_once_with( - CONNECTOR_FULL_NAME, - dry_run=True, + CONNECTOR_FULL_NAME, dry_run=True ) def test_reset_when_dry_run_is_true( @@ -196,12 +191,10 @@ def test_reset_when_dry_run_is_false( mocker: MockerFixture, ): mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, - "delete_topics", + connector.handlers.topic_handler, "delete_topics" ) mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, - "clean_connector", + connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() mock.attach_mock(mock_clean_connector, "mock_clean_connector") @@ -271,12 +264,10 @@ def test_clean_when_dry_run_is_false( mocker: MockerFixture, ): mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, - "delete_topics", + connector.handlers.topic_handler, "delete_topics" ) mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, - "clean_connector", + connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -290,13 +281,13 @@ def test_clean_when_dry_run_is_false( assert log_info_mock.mock_calls == [ call.log_info( magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_FULL_NAME}", - ), + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_FULL_NAME}" + ) ), call.log_info( magentaify( - f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_FULL_NAME}", - ), + f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_FULL_NAME}" + ) ), call.log_info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")), ] @@ -378,12 +369,10 @@ def test_clean_without_to_when_dry_run_is_false( ) mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, - "delete_topics", + connector.handlers.topic_handler, "delete_topics" ) mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, - "clean_connector", + connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() mock.attach_mock(mock_delete_topics, "mock_delete_topics") diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 4ed187884..169111ed3 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -48,10 +48,9 @@ def connector( to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), - }, + } ), offset_topic="kafka-connect-offsets", ) @@ -72,9 +71,9 @@ def test_from_section_raises_exception( from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={ TopicName("connector-topic"): FromTopic( - type=InputTopicTypes.INPUT, + type=InputTopicTypes.INPUT ), - }, + } ), ) @@ -84,13 +83,11 @@ def test_deploy_order( mocker: MockerFixture, ): mock_create_topics = mocker.patch.object( - connector.handlers.topic_handler, - "create_topics", + connector.handlers.topic_handler, "create_topics" ) mock_create_connector = mocker.patch.object( - connector.handlers.connector_handler, - "create_connector", + connector.handlers.connector_handler, "create_connector" ) mock = mocker.MagicMock() @@ -111,15 +108,13 @@ def test_destroy( assert connector.handlers.connector_handler mock_destroy_connector = mocker.patch.object( - connector.handlers.connector_handler, - "destroy_connector", + connector.handlers.connector_handler, "destroy_connector" ) connector.destroy(dry_run=True) mock_destroy_connector.assert_called_once_with( - CONNECTOR_FULL_NAME, - dry_run=True, + CONNECTOR_FULL_NAME, dry_run=True ) def test_reset_when_dry_run_is_true( @@ -142,12 +137,10 @@ def test_reset_when_dry_run_is_false( ): assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, - "delete_topics", + connector.handlers.topic_handler, "delete_topics" ) mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, - "clean_connector", + connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -217,12 +210,10 @@ def test_clean_when_dry_run_is_false( assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, - "delete_topics", + connector.handlers.topic_handler, "delete_topics" ) mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, - "clean_connector", + connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() @@ -295,12 +286,10 @@ def test_clean_without_to_when_dry_run_is_false( assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, - "delete_topics", + connector.handlers.topic_handler, "delete_topics" ) mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, - "clean_connector", + connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index a3fc7281b..6583ac4bf 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -46,7 +46,7 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture() def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kubernetes_app.Helm", + "kpops.components.base_components.kubernetes_app.Helm" ).return_value @pytest.fixture() @@ -113,8 +113,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( app_value: KubernetesTestValue, ): repo_config = HelmRepoConfig( - repository_name="test-repo", - url="https://test.com/charts/", + repository_name="test-repo", url="https://test.com/charts/" ) kubernetes_app = KubernetesApp( name="test-kubernetes-app", @@ -212,9 +211,7 @@ def test_should_call_helm_uninstall_when_destroying_kubernetes_app( kubernetes_app.destroy(True) helm_mock.uninstall.assert_called_once_with( - "test-namespace", - "${pipeline_name}-test-kubernetes-app", - True, + "test-namespace", "${pipeline_name}-test-kubernetes-app", True ) log_info_mock.assert_called_once_with(magentaify(stdout)) @@ -227,8 +224,7 @@ def test_should_raise_value_error_when_name_is_not_valid( repo_config: HelmRepoConfig, ): with pytest.raises( - ValueError, - match=r"The component name .* is invalid for Kubernetes.", + ValueError, match=r"The component name .* is invalid for Kubernetes." ): KubernetesApp( name="Not-Compatible*", @@ -240,8 +236,7 @@ def test_should_raise_value_error_when_name_is_not_valid( ) with pytest.raises( - ValueError, - match=r"The component name .* is invalid for Kubernetes.", + ValueError, match=r"The component name .* is invalid for Kubernetes." ): KubernetesApp( name="snake_case*", diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index f6f4bb659..84f9f86c6 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -42,9 +42,7 @@ def config(self) -> PipelineConfig: @pytest.fixture() def producer_app( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ) -> ProducerApp: return ProducerApp( name=self.PRODUCER_APP_NAME, @@ -60,10 +58,9 @@ def producer_app( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), - }, + } }, }, ) @@ -82,21 +79,20 @@ def test_output_topics(self, config: PipelineConfig, handlers: ComponentHandlers "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), "extra-topic-1": TopicConfig( role="first-extra-topic", partitions_count=10, ), - }, + } }, }, ) assert producer_app.app.streams.output_topic == "${output_topic_name}" assert producer_app.app.streams.extra_output_topics == { - "first-extra-topic": "extra-topic-1", + "first-extra-topic": "extra-topic-1" } def test_deploy_order_when_dry_run_is_false( @@ -105,13 +101,11 @@ def test_deploy_order_when_dry_run_is_false( mocker: MockerFixture, ): mock_create_topics = mocker.patch.object( - producer_app.handlers.topic_handler, - "create_topics", + producer_app.handlers.topic_handler, "create_topics" ) mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, - "upgrade_install", + producer_app.helm, "upgrade_install" ) mock = mocker.MagicMock() @@ -156,9 +150,7 @@ def test_destroy( producer_app.destroy(dry_run=True) mock_helm_uninstall.assert_called_once_with( - "test-namespace", - "${pipeline_name}-" + self.PRODUCER_APP_NAME, - True, + "test-namespace", "${pipeline_name}-" + self.PRODUCER_APP_NAME, True ) def test_should_not_reset_producer_app( @@ -167,13 +159,11 @@ def test_should_not_reset_producer_app( mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, - "upgrade_install", + producer_app.helm, "upgrade_install" ) mock_helm_uninstall = mocker.patch.object(producer_app.helm, "uninstall") mock_helm_print_helm_diff = mocker.patch.object( - producer_app.dry_run_handler, - "print_helm_diff", + producer_app.dry_run_handler, "print_helm_diff" ) mock = mocker.MagicMock() @@ -215,13 +205,10 @@ def test_should_not_reset_producer_app( ] def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false( - self, - mocker: MockerFixture, - producer_app: ProducerApp, + self, mocker: MockerFixture, producer_app: ProducerApp ): mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, - "upgrade_install", + producer_app.helm, "upgrade_install" ) mock_helm_uninstall = mocker.patch.object(producer_app.helm, "uninstall") diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 071be0095..0d9135b54 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -47,9 +47,7 @@ def config(self) -> PipelineConfig: @pytest.fixture() def streams_app( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ) -> StreamsApp: return StreamsApp( name=self.STREAMS_APP_NAME, @@ -63,10 +61,9 @@ def streams_app( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), - }, + } }, }, ) @@ -94,7 +91,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): "type": "pattern", "role": "another-pattern", }, - }, + } }, }, ) @@ -105,7 +102,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): assert streams_app.app.streams.input_topics == ["example-input", "b", "a"] assert streams_app.app.streams.input_pattern == ".*" assert streams_app.app.streams.extra_input_patterns == { - "another-pattern": "example.*", + "another-pattern": "example.*" } helm_values = streams_app.to_helm_values() @@ -116,9 +113,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): assert "extraInputPatterns" in streams_config def test_no_empty_input_topic( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -132,7 +127,7 @@ def test_no_empty_input_topic( "from": { "topics": { ".*": {"type": "pattern"}, - }, + } }, }, ) @@ -151,8 +146,7 @@ def test_no_empty_input_topic( def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandlers): # An exception should be raised when both role and type are defined and type is input with pytest.raises( - ValueError, - match="Define role only if `type` is `pattern` or `None`", + ValueError, match="Define role only if `type` is `pattern` or `None`" ): StreamsApp( name=self.STREAMS_APP_NAME, @@ -168,16 +162,15 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle "topic-input": { "type": "input", "role": "role", - }, - }, + } + } }, }, ) # An exception should be raised when both role and type are defined and type is error with pytest.raises( - ValueError, - match="Define `role` only if `type` is undefined", + ValueError, match="Define `role` only if `type` is undefined" ): StreamsApp( name=self.STREAMS_APP_NAME, @@ -193,16 +186,14 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle "topic-input": { "type": "error", "role": "role", - }, - }, + } + } }, }, ) def test_set_streams_output_from_to( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -216,12 +207,10 @@ def test_set_streams_output_from_to( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), "${error_topic_name}": TopicConfig( - type=OutputTopicTypes.ERROR, - partitions_count=10, + type=OutputTopicTypes.ERROR, partitions_count=10 ), "extra-topic-1": TopicConfig( role="first-extra-topic", @@ -231,7 +220,7 @@ def test_set_streams_output_from_to( role="second-extra-topic", partitions_count=10, ), - }, + } }, }, ) @@ -243,9 +232,7 @@ def test_set_streams_output_from_to( assert streams_app.app.streams.error_topic == "${error_topic_name}" def test_weave_inputs_from_prev_component( - self, - config: PipelineConfig, - handlers: ComponentHandlers, + self, config: PipelineConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -263,23 +250,19 @@ def test_weave_inputs_from_prev_component( ToSection( topics={ TopicName("prev-output-topic"): TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), TopicName("b"): TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), TopicName("a"): TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), TopicName("prev-error-topic"): TopicConfig( - type=OutputTopicTypes.ERROR, - partitions_count=10, + type=OutputTopicTypes.ERROR, partitions_count=10 ), - }, - ), + } + ) ) assert streams_app.app.streams.input_topics == ["prev-output-topic", "b", "a"] @@ -302,12 +285,10 @@ def test_deploy_order_when_dry_run_is_false( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, - partitions_count=10, + type=OutputTopicTypes.OUTPUT, partitions_count=10 ), "${error_topic_name}": TopicConfig( - type=OutputTopicTypes.ERROR, - partitions_count=10, + type=OutputTopicTypes.ERROR, partitions_count=10 ), "extra-topic-1": TopicConfig( role="first-extra-topic", @@ -317,17 +298,15 @@ def test_deploy_order_when_dry_run_is_false( role="second-extra-topic", partitions_count=10, ), - }, + } }, }, ) mock_create_topics = mocker.patch.object( - streams_app.handlers.topic_handler, - "create_topics", + streams_app.handlers.topic_handler, "create_topics" ) mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, - "upgrade_install", + streams_app.helm, "upgrade_install" ) mock = mocker.MagicMock() @@ -353,7 +332,7 @@ def test_deploy_order_when_dry_run_is_false( }, "outputTopic": "${output_topic_name}", "errorTopic": "${error_topic_name}", - }, + } }, HelmUpgradeInstallFlags( create_namespace=False, @@ -376,19 +355,14 @@ def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): streams_app.destroy(dry_run=True) mock_helm_uninstall.assert_called_once_with( - "test-namespace", - "${pipeline_name}-" + self.STREAMS_APP_NAME, - True, + "test-namespace", "${pipeline_name}-" + self.STREAMS_APP_NAME, True ) def test_reset_when_dry_run_is_false( - self, - streams_app: StreamsApp, - mocker: MockerFixture, + self, streams_app: StreamsApp, mocker: MockerFixture ): mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, - "upgrade_install", + streams_app.helm, "upgrade_install" ) mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") @@ -432,8 +406,7 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, - "upgrade_install", + streams_app.helm, "upgrade_install" ) mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index b1739e972..86e2c8b8e 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -52,13 +52,12 @@ def inflate(self) -> list[PipelineComponent]: to=ToSection( topics={ TopicName("${component_type}"): TopicConfig( - type=OutputTopicTypes.OUTPUT, + type=OutputTopicTypes.OUTPUT ), TopicName("${component_name}"): TopicConfig( - type=None, - role="test", + type=None, role="test" ), - }, + } ), ) inflate_steps.append(kafka_connector) @@ -69,9 +68,9 @@ def inflate(self) -> list[PipelineComponent]: to=ToSection( # type: ignore[reportGeneralTypeIssues] topics={ TopicName( - f"{self.full_name}-" + "${component_name}", - ): TopicConfig(type=OutputTopicTypes.OUTPUT), - }, + f"{self.full_name}-" + "${component_name}" + ): TopicConfig(type=OutputTopicTypes.OUTPUT) + } ).dict(), ) inflate_steps.append(streams_app) @@ -81,9 +80,7 @@ def inflate(self) -> list[PipelineComponent]: class TestSchemaProvider(SchemaProvider): def provide_schema( - self, - schema_class: str, - models: dict[ModelName, ModelVersion], + self, schema_class: str, models: dict[ModelName, ModelVersion] ) -> Schema: schema = { "type": "record", diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index 9ea414a9d..d5684178c 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -33,7 +33,7 @@ def inflate(self) -> list[PipelineComponent]: **{ "topics": topic_name, "transforms.changeTopic.replacement": f"{topic_name}-index-v1", - }, + } ), ) inflate_steps.append(kafka_connector) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index ceda59d80..433960e74 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -48,8 +48,7 @@ def test_load_pipeline(self, snapshot: SnapshotTest): snapshot.assert_match(enriched_pipeline, "test-pipeline") def test_generate_with_steps_flag_should_write_log_warning( - self, - caplog: pytest.LogCaptureFixture, + self, caplog: pytest.LogCaptureFixture ): result = runner.invoke( app, @@ -73,7 +72,7 @@ def test_generate_with_steps_flag_should_write_log_warning( logging.WARNING, "The following flags are considered only when `--template` is set: \n \ '--steps'", - ), + ) ] assert result.exit_code == 0 diff --git a/tests/utils/test_dict_ops.py b/tests/utils/test_dict_ops.py index e9a02fe5b..1ea410770 100644 --- a/tests/utils/test_dict_ops.py +++ b/tests/utils/test_dict_ops.py @@ -70,7 +70,7 @@ class SimpleModel(BaseModel): }, }, problems=99, - ).json(), + ).json() ) existing_substitution = { "key1": "Everything", diff --git a/tests/utils/test_diff.py b/tests/utils/test_diff.py index 81b66b2cd..f2ffeac88 100644 --- a/tests/utils/test_diff.py +++ b/tests/utils/test_diff.py @@ -186,7 +186,7 @@ def test_render_diff(d1: dict, d2: dict, ignore: set[str] | None, output: str | diff_type=DiffType.CHANGE, key="a.b", change=Change(old_value=1, new_value=2), - ), + ) ], ), ], diff --git a/tests/utils/test_environment.py b/tests/utils/test_environment.py index e1da952b3..8fc02c826 100644 --- a/tests/utils/test_environment.py +++ b/tests/utils/test_environment.py @@ -91,8 +91,7 @@ def test_windows_behaviour_keys_transformation(system, fake_environment_windows) @patch("platform.system") def test_windows_behaviour_keys_transformation_as_kwargs( - system, - fake_environment_windows, + system, fake_environment_windows ): system.return_value = "Windows" environment = Environment(**fake_environment_windows)