diff --git a/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst b/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst index 7385bb8d0be81..c0f826ffa588c 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst @@ -71,3 +71,45 @@ To delete an AlloyDB cluster you can use :dedent: 4 :start-after: [START howto_operator_alloy_db_delete_cluster] :end-before: [END howto_operator_alloy_db_delete_cluster] + +.. _howto/operator:AlloyDBCreateInstanceOperator: + +Create instance +""""""""""""""" + +To create an AlloyDB instance (primary end secondary) you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBCreateInstanceOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_create_instance] + :end-before: [END howto_operator_alloy_db_create_instance] + +.. _howto/operator:AlloyDBUpdateInstanceOperator: + +Update instance +""""""""""""""" + +To update an AlloyDB instance you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBUpdateInstanceOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_update_instance] + :end-before: [END howto_operator_alloy_db_update_instance] + +.. _howto/operator:AlloyDBDeleteInstanceOperator: + +Delete instance +""""""""""""""" + +To delete an AlloyDB instance you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBDeleteInstanceOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_delete_instance] + :end-before: [END howto_operator_alloy_db_delete_instance] diff --git a/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py b/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py index 9ff4e6004bd3f..6da6e706de473 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py +++ b/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py @@ -48,7 +48,8 @@ def get_alloy_db_admin_client(self) -> alloydb_v1.AlloyDBAdminClient: """Retrieve AlloyDB client.""" if not self._client: self._client = alloydb_v1.AlloyDBAdminClient( - credentials=self.get_credentials(), client_info=CLIENT_INFO + credentials=self.get_credentials(), + client_info=CLIENT_INFO, ) return self._client @@ -87,7 +88,15 @@ def create_cluster( https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Cluster :param location: Required. The ID of the Google Cloud region where the cluster is located. :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. - :param request_id: Optional. The ID of an existing request object. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). :param validate_only: Optional. If set, performs request validation, but does not actually execute the create request. :param retry: Optional. Designation of what errors, if any, should be retried. @@ -133,7 +142,15 @@ def create_secondary_cluster( https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Cluster :param location: Required. The ID of the Google Cloud region where the cluster is located. :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. - :param request_id: Optional. The ID of an existing request object. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). :param validate_only: Optional. If set, performs request validation, but does not actually execute the create request. :param retry: Optional. Designation of what errors, if any, should be retried. @@ -171,7 +188,7 @@ def get_cluster( For more details see API documentation: https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.GetClusterRequest - :param cluster_id: Required. ID of the cluster to create. + :param cluster_id: Required. ID of the cluster. :param location: Required. The ID of the Google Cloud region where the cluster is located. :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. :param retry: Optional. Designation of what errors, if any, should be retried. @@ -214,7 +231,15 @@ def update_cluster( :param location: Required. The ID of the Google Cloud region where the cluster is located. :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the Cluster resource by the update. - :param request_id: Optional. The ID of an existing request object. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). :param validate_only: Optional. If set, performs request validation, but does not actually execute the create request. :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. @@ -264,7 +289,15 @@ def delete_cluster( :param cluster_id: Required. ID of the cluster to delete. :param location: Required. The ID of the Google Cloud region where the cluster is located. :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. - :param request_id: Optional. The ID of an existing request object. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). :param etag: Optional. The current etag of the Cluster. If an etag is provided and does not match the current etag of the Cluster, deletion will be blocked and an ABORTED error will be returned. :param validate_only: Optional. If set, performs request validation, but does not actually execute @@ -287,3 +320,270 @@ def delete_cluster( timeout=timeout, metadata=metadata, ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_instance( + self, + cluster_id: str, + instance_id: str, + instance: alloydb_v1.Instance | dict, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create an instance in a given Alloy DB cluster. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.CreateInstanceRequest + + :param cluster_id: Required. ID of the cluster for creating an instance in. + :param instance_id: Required. ID of the instance to create. + :param instance: Required. Instance to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.create_instance( + request={ + "parent": client.cluster_path(project_id, location, cluster_id), + "instance_id": instance_id, + "instance": instance, + "request_id": request_id, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_secondary_instance( + self, + cluster_id: str, + instance_id: str, + instance: alloydb_v1.Instance | dict, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create a secondary instance in a given Alloy DB cluster. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.CreateSecondaryInstanceRequest + + :param cluster_id: Required. ID of the cluster for creating an instance in. + :param instance_id: Required. ID of the instance to create. + :param instance: Required. Instance to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.create_secondary_instance( + request={ + "parent": client.cluster_path(project_id, location, cluster_id), + "instance_id": instance_id, + "instance": instance, + "request_id": request_id, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_instance( + self, + cluster_id: str, + instance_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> alloydb_v1.Instance: + """ + Retrieve an Alloy DB instance. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.GetInstanceRequest + + :param cluster_id: Required. ID of the cluster. + :param instance_id: Required. ID of the instance. + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.get_instance( + request={"name": client.instance_path(project_id, location, cluster_id, instance_id)}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def update_instance( + self, + cluster_id: str, + instance_id: str, + instance: alloydb_v1.Instance | dict, + location: str, + update_mask: FieldMask | dict | None = None, + project_id: str = PROVIDE_PROJECT_ID, + allow_missing: bool = False, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Update an Alloy DB instance. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.UpdateInstanceRequest + + :param cluster_id: Required. ID of the cluster. + :param instance_id: Required. ID of the cluster to update. + :param instance: Required. Cluster to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the + Instance resource by the update. + :param request_id: Optional. The ID of an existing request object.:param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param allow_missing: Optional. If set to true, update succeeds even if cluster is not found. + In that case, a new cluster is created and update_mask is ignored. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + _instance = ( + deepcopy(instance) if isinstance(instance, dict) else alloydb_v1.Instance.to_dict(instance) + ) + _instance["name"] = client.instance_path(project_id, location, cluster_id, instance_id) + return client.update_instance( + request={ + "update_mask": update_mask, + "instance": _instance, + "request_id": request_id, + "validate_only": validate_only, + "allow_missing": allow_missing, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_instance( + self, + instance_id: str, + cluster_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + etag: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ): + """ + Delete an Alloy DB instance. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.DeleteInstanceRequest + + :param instance_id: Required. ID of the instance to delete. + :param cluster_id: Required. ID of the cluster. + :param location: Required. The ID of the Google Cloud region where the instance is located. + :param project_id: Optional. The ID of the Google Cloud project where the instance is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param etag: Optional. The current etag of the Instance. If an etag is provided and does not match the + current etag of the Instance, deletion will be blocked and an ABORTED error will be returned. + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the delete request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.delete_instance( + request={ + "name": client.instance_path(project_id, location, cluster_id, instance_id), + "request_id": request_id, + "etag": etag, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) diff --git a/providers/src/airflow/providers/google/cloud/operators/alloy_db.py b/providers/src/airflow/providers/google/cloud/operators/alloy_db.py index ff5f664214167..bb7680b946f4a 100644 --- a/providers/src/airflow/providers/google/cloud/operators/alloy_db.py +++ b/providers/src/airflow/providers/google/cloud/operators/alloy_db.py @@ -21,9 +21,9 @@ from collections.abc import Sequence from functools import cached_property -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING -from google.api_core.exceptions import AlreadyExists, InvalidArgument +from google.api_core.exceptions import NotFound from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.cloud import alloydb_v1 @@ -139,10 +139,10 @@ def get_operation_result(self, operation: Operation) -> proto.Message | None: If the `validate_request` parameter is set, then no operation is performed and thus nothing to wait. """ if self.validate_request: - self.log.info("The request validation has been passed successfully!") + # Validation requests are only validated and aren't executed, thus no operation result is expected + return None else: return self.hook.wait_for_operation(timeout=self.timeout, operation=operation) - return None class AlloyDBCreateClusterOperator(AlloyDBWriteBaseOperator): @@ -189,7 +189,8 @@ class AlloyDBCreateClusterOperator(AlloyDBWriteBaseOperator): """ template_fields: Sequence[str] = tuple( - {"cluster_id", "is_secondary"} | set(AlloyDBWriteBaseOperator.template_fields) + {"cluster_id", "cluster_configuration", "is_secondary"} + | set(AlloyDBWriteBaseOperator.template_fields) ) operator_extra_links = (AlloyDBClusterLink(),) @@ -206,13 +207,40 @@ def __init__( self.cluster_configuration = cluster_configuration self.is_secondary = is_secondary - def execute(self, context: Context) -> Any: - message = ( - "Validating a Create AlloyDB cluster request." - if self.validate_request - else "Creating an AlloyDB cluster." + def _get_cluster(self) -> proto.Message | None: + self.log.info("Checking if the cluster %s exists already...", self.cluster_id) + try: + cluster = self.hook.get_cluster( + cluster_id=self.cluster_id, + location=self.location, + project_id=self.project_id, + ) + except NotFound: + self.log.info("The cluster %s does not exist yet.", self.cluster_id) + except Exception as ex: + raise AirflowException(ex) from ex + else: + self.log.info("AlloyDB cluster %s already exists.", self.cluster_id) + result = alloydb_v1.Cluster.to_dict(cluster) + return result + return None + + def execute(self, context: Context) -> dict | None: + AlloyDBClusterLink.persist( + context=context, + task_instance=self, + location_id=self.location, + cluster_id=self.cluster_id, + project_id=self.project_id, ) - self.log.info(message) + + if cluster := self._get_cluster(): + return cluster + + if self.validate_request: + self.log.info("Validating a Create AlloyDB cluster request.") + else: + self.log.info("Creating an AlloyDB cluster.") try: create_method = ( @@ -229,40 +257,12 @@ def execute(self, context: Context) -> Any: timeout=self.timeout, metadata=self.metadata, ) - except AlreadyExists: - self.log.info("AlloyDB cluster %s already exists.", self.cluster_id) - result = self.hook.get_cluster( - cluster_id=self.cluster_id, - location=self.location, - project_id=self.project_id, - ) - result = alloydb_v1.Cluster.to_dict(result) - except InvalidArgument as ex: - if "cannot create more than one secondary cluster per primary cluster" in ex.message: - result = self.hook.get_cluster( - cluster_id=self.cluster_id, - location=self.location, - project_id=self.project_id, - ) - result = alloydb_v1.Cluster.to_dict(result) - self.log.info("AlloyDB cluster %s already exists.", result.get("name").split("/")[-1]) - else: - raise AirflowException(ex.message) except Exception as ex: raise AirflowException(ex) else: operation_result = self.get_operation_result(operation) result = alloydb_v1.Cluster.to_dict(operation_result) if operation_result else None - if result: - AlloyDBClusterLink.persist( - context=context, - task_instance=self, - location_id=self.location, - cluster_id=self.cluster_id, - project_id=self.project_id, - ) - return result @@ -274,7 +274,7 @@ class AlloyDBUpdateClusterOperator(AlloyDBWriteBaseOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AlloyDBUpdateClusterOperator` - :param cluster_id: Required. ID of the cluster to create. + :param cluster_id: Required. ID of the cluster to update. :param cluster_configuration: Required. Cluster to update. For more details please see API documentation: https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Cluster :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the @@ -311,7 +311,8 @@ class AlloyDBUpdateClusterOperator(AlloyDBWriteBaseOperator): """ template_fields: Sequence[str] = tuple( - {"cluster_id", "allow_missing"} | set(AlloyDBWriteBaseOperator.template_fields) + {"cluster_id", "cluster_configuration", "allow_missing"} + | set(AlloyDBWriteBaseOperator.template_fields) ) operator_extra_links = (AlloyDBClusterLink(),) @@ -330,13 +331,18 @@ def __init__( self.update_mask = update_mask self.allow_missing = allow_missing - def execute(self, context: Context) -> Any: - message = ( - "Validating an Update AlloyDB cluster request." - if self.validate_request - else "Updating an AlloyDB cluster." + def execute(self, context: Context) -> dict | None: + AlloyDBClusterLink.persist( + context=context, + task_instance=self, + location_id=self.location, + cluster_id=self.cluster_id, + project_id=self.project_id, ) - self.log.info(message) + if self.validate_request: + self.log.info("Validating an Update AlloyDB cluster request.") + else: + self.log.info("Updating an AlloyDB cluster.") try: operation = self.hook.update_cluster( @@ -358,14 +364,6 @@ def execute(self, context: Context) -> Any: operation_result = self.get_operation_result(operation) result = alloydb_v1.Cluster.to_dict(operation_result) if operation_result else None - AlloyDBClusterLink.persist( - context=context, - task_instance=self, - location_id=self.location, - cluster_id=self.cluster_id, - project_id=self.project_id, - ) - if not self.validate_request: self.log.info("AlloyDB cluster %s was successfully updated.", self.cluster_id) return result @@ -379,7 +377,7 @@ class AlloyDBDeleteClusterOperator(AlloyDBWriteBaseOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AlloyDBDeleteClusterOperator` - :param cluster_id: Required. ID of the cluster to create. + :param cluster_id: Required. ID of the cluster to delete. :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID so that if you must retry your request, the server ignores the request if it has already been completed. The server guarantees that for at least 60 minutes since the first request. @@ -429,13 +427,11 @@ def __init__( self.etag = etag self.force = force - def execute(self, context: Context) -> Any: - message = ( - "Validating a Delete AlloyDB cluster request." - if self.validate_request - else "Deleting an AlloyDB cluster." - ) - self.log.info(message) + def execute(self, context: Context) -> None: + if self.validate_request: + self.log.info("Validating a Delete AlloyDB cluster request.") + else: + self.log.info("Deleting an AlloyDB cluster.") try: operation = self.hook.delete_cluster( @@ -457,3 +453,324 @@ def execute(self, context: Context) -> Any: if not self.validate_request: self.log.info("AlloyDB cluster %s was successfully removed.", self.cluster_id) + + +class AlloyDBCreateInstanceOperator(AlloyDBWriteBaseOperator): + """ + Create an Instance in an Alloy DB cluster. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBCreateInstanceOperator` + + :param cluster_id: Required. ID of the cluster for creating an instance in. + :param instance_id: Required. ID of the instance to create. + :param instance_configuration: Required. Instance to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance + :param is_secondary: Required. Specifies if the Instance to be created is Primary or Secondary. + Please note, if set True, then specify the `instance_type` field in the instance. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"cluster_id", "instance_id", "is_secondary", "instance_configuration"} + | set(AlloyDBWriteBaseOperator.template_fields) + ) + operator_extra_links = (AlloyDBClusterLink(),) + + def __init__( + self, + cluster_id: str, + instance_id: str, + instance_configuration: alloydb_v1.Instance | dict, + is_secondary: bool = False, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.cluster_id = cluster_id + self.instance_id = instance_id + self.instance_configuration = instance_configuration + self.is_secondary = is_secondary + + def _get_instance(self) -> proto.Message | None: + self.log.info("Checking if the instance %s exists already...", self.instance_id) + try: + instance = self.hook.get_instance( + cluster_id=self.cluster_id, + instance_id=self.instance_id, + location=self.location, + project_id=self.project_id, + ) + except NotFound: + self.log.info("The instance %s does not exist yet.", self.instance_id) + except Exception as ex: + raise AirflowException(ex) from ex + else: + self.log.info( + "AlloyDB instance %s already exists in the cluster %s.", + self.cluster_id, + self.instance_id, + ) + result = alloydb_v1.Instance.to_dict(instance) + return result + return None + + def execute(self, context: Context) -> dict | None: + AlloyDBClusterLink.persist( + context=context, + task_instance=self, + location_id=self.location, + cluster_id=self.cluster_id, + project_id=self.project_id, + ) + if instance := self._get_instance(): + return instance + + if self.validate_request: + self.log.info("Validating a Create AlloyDB instance request.") + else: + self.log.info("Creating an AlloyDB instance.") + + try: + create_method = ( + self.hook.create_secondary_instance if self.is_secondary else self.hook.create_instance + ) + operation = create_method( + cluster_id=self.cluster_id, + instance_id=self.instance_id, + instance=self.instance_configuration, + location=self.location, + project_id=self.project_id, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) + else: + operation_result = self.get_operation_result(operation) + result = alloydb_v1.Instance.to_dict(operation_result) if operation_result else None + + return result + + +class AlloyDBUpdateInstanceOperator(AlloyDBWriteBaseOperator): + """ + Update an Alloy DB instance. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBUpdateInstanceOperator` + + :param cluster_id: Required. ID of the cluster. + :param instance_id: Required. ID of the cluster to update. + :param instance_configuration: Required. Instance to update. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance + :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the + Instance resource by the update. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param allow_missing: Optional. If set to true, update succeeds even if instance is not found. + In that case, a new instance is created and update_mask is ignored. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"cluster_id", "instance_id", "instance_configuration", "update_mask", "allow_missing"} + | set(AlloyDBWriteBaseOperator.template_fields) + ) + operator_extra_links = (AlloyDBClusterLink(),) + + def __init__( + self, + cluster_id: str, + instance_id: str, + instance_configuration: alloydb_v1.Instance | dict, + update_mask: FieldMask | dict | None = None, + allow_missing: bool = False, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.cluster_id = cluster_id + self.instance_id = instance_id + self.instance_configuration = instance_configuration + self.update_mask = update_mask + self.allow_missing = allow_missing + + def execute(self, context: Context) -> dict | None: + AlloyDBClusterLink.persist( + context=context, + task_instance=self, + location_id=self.location, + cluster_id=self.cluster_id, + project_id=self.project_id, + ) + if self.validate_request: + self.log.info("Validating an Update AlloyDB instance request.") + else: + self.log.info("Updating an AlloyDB instance.") + + try: + operation = self.hook.update_instance( + cluster_id=self.cluster_id, + instance_id=self.instance_id, + project_id=self.project_id, + location=self.location, + instance=self.instance_configuration, + update_mask=self.update_mask, + allow_missing=self.allow_missing, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) from ex + else: + operation_result = self.get_operation_result(operation) + result = alloydb_v1.Instance.to_dict(operation_result) if operation_result else None + + if not self.validate_request: + self.log.info("AlloyDB instance %s was successfully updated.", self.cluster_id) + return result + + +class AlloyDBDeleteInstanceOperator(AlloyDBWriteBaseOperator): + """ + Delete an Alloy DB instance. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBDeleteInstanceOperator` + + :param instance_id: Required. ID of the instance to delete. + :param cluster_id: Required. ID of the cluster. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param etag: Optional. The current etag of the Instance. If an etag is provided and does not match the + current etag of the Instance, deletion will be blocked and an ABORTED error will be returned. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"instance_id", "cluster_id", "etag"} | set(AlloyDBWriteBaseOperator.template_fields) + ) + + def __init__( + self, + instance_id: str, + cluster_id: str, + etag: str | None = None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.instance_id = instance_id + self.cluster_id = cluster_id + self.etag = etag + + def execute(self, context: Context) -> None: + if self.validate_request: + self.log.info("Validating a Delete AlloyDB instance request.") + else: + self.log.info("Deleting an AlloyDB instance.") + + try: + operation = self.hook.delete_instance( + instance_id=self.instance_id, + cluster_id=self.cluster_id, + project_id=self.project_id, + location=self.location, + etag=self.etag, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) from ex + else: + self.get_operation_result(operation) + + if not self.validate_request: + self.log.info("AlloyDB instance %s was successfully removed.", self.instance_id) diff --git a/providers/tests/google/cloud/hooks/test_alloy_db.py b/providers/tests/google/cloud/hooks/test_alloy_db.py index 5c48287b72348..ad4f4dfd597de 100644 --- a/providers/tests/google/cloud/hooks/test_alloy_db.py +++ b/providers/tests/google/cloud/hooks/test_alloy_db.py @@ -60,7 +60,7 @@ def setup_method(self): @mock.patch(HOOK_PATH.format("AlloyDbHook.get_credentials")) @mock.patch(HOOK_PATH.format("alloydb_v1.AlloyDBAdminClient")) - def test_gget_alloy_db_admin_client(self, mock_client, mock_get_credentials): + def test_get_alloy_db_admin_client(self, mock_client, mock_get_credentials): mock_credentials = mock_get_credentials.return_value expected_client = mock_client.return_value diff --git a/providers/tests/google/cloud/operators/test_alloy_db.py b/providers/tests/google/cloud/operators/test_alloy_db.py index e2c82fc4fbbcb..c2ed4a3ae8352 100644 --- a/providers/tests/google/cloud/operators/test_alloy_db.py +++ b/providers/tests/google/cloud/operators/test_alloy_db.py @@ -22,15 +22,18 @@ from unittest.mock import call import pytest -from google.api_core.exceptions import AlreadyExists, InvalidArgument +from google.api_core.exceptions import NotFound from google.api_core.gapic_v1.method import DEFAULT from airflow.exceptions import AirflowException from airflow.providers.google.cloud.operators.alloy_db import ( AlloyDBBaseOperator, AlloyDBCreateClusterOperator, + AlloyDBCreateInstanceOperator, AlloyDBDeleteClusterOperator, + AlloyDBDeleteInstanceOperator, AlloyDBUpdateClusterOperator, + AlloyDBUpdateInstanceOperator, AlloyDBWriteBaseOperator, ) @@ -55,7 +58,19 @@ TEST_ETAG = "test-etag" TEST_FORCE = False +TEST_INSTANCE_ID = "test_instance_id" +TEST_INSTANCE: dict[str, Any] = {} + OPERATOR_MODULE_PATH = "airflow.providers.google.cloud.operators.alloy_db.{}" +ALLOY_DB_HOOK_PATH = OPERATOR_MODULE_PATH.format("AlloyDbHook") +BASE_WRITE_CLUSTER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBWriteBaseOperator.{}") +CREATE_CLUSTER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.{}") +UPDATE_CLUSTER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.{}") +DELETE_CLUSTER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.{}") + +CREATE_INSTANCE_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBCreateInstanceOperator.{}") +UPDATE_INSTANCE_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBUpdateInstanceOperator.{}") +DELETE_INSTANCE_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBDeleteInstanceOperator.{}") class TestAlloyDBBaseOperator: @@ -84,7 +99,7 @@ def test_template_fields(self): expected_template_fields = {"project_id", "location", "gcp_conn_id"} assert set(AlloyDBBaseOperator.template_fields) == expected_template_fields - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook")) + @mock.patch(ALLOY_DB_HOOK_PATH) def test_hook(self, mock_hook): expected_hook = mock_hook.return_value @@ -123,8 +138,8 @@ def test_template_fields(self): ) assert set(AlloyDBWriteBaseOperator.template_fields) == expected_template_fields - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBWriteBaseOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook")) + @mock.patch(BASE_WRITE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH) def test_get_operation_result(self, mock_hook, mock_log): mock_operation = mock.MagicMock() mock_wait_for_operation = mock_hook.return_value.wait_for_operation @@ -136,8 +151,8 @@ def test_get_operation_result(self, mock_hook, mock_log): assert not mock_log.called mock_wait_for_operation.assert_called_once_with(timeout=TEST_TIMEOUT, operation=mock_operation) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBWriteBaseOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook")) + @mock.patch(BASE_WRITE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH) def test_get_operation_result_validate_result(self, mock_hook, mock_log): mock_operation = mock.MagicMock() mock_wait_for_operation = mock_hook.return_value.wait_for_operation @@ -146,7 +161,7 @@ def test_get_operation_result_validate_result(self, mock_hook, mock_log): result = self.operator.get_operation_result(mock_operation) assert result is None - mock_log.info.assert_called_once_with("The request validation has been passed successfully!") + assert not mock_log.info.called assert not mock_wait_for_operation.called @@ -174,29 +189,102 @@ def test_init(self): assert self.operator.is_secondary == TEST_IS_SECONDARY def test_template_fields(self): - expected_template_fields = {"cluster_id", "is_secondary"} | set( + expected_template_fields = {"cluster_id", "is_secondary", "cluster_configuration"} | set( AlloyDBWriteBaseOperator.template_fields ) assert set(AlloyDBCreateClusterOperator.template_fields) == expected_template_fields + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_cluster_not_found(self, mock_hook, mock_log): + mock_get_cluster = mock_hook.return_value.get_cluster + mock_get_cluster.side_effect = NotFound("Not found") + + result = self.operator._get_cluster() + + mock_get_cluster.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the cluster %s exists already...", TEST_CLUSTER_ID), + call("The cluster %s does not exist yet.", TEST_CLUSTER_ID), + ] + ) + assert result is None + + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_cluster_exception(self, mock_hook, mock_log): + mock_get_cluster = mock_hook.return_value.get_cluster + mock_get_cluster.side_effect = Exception() + + with pytest.raises(AirflowException): + self.operator._get_cluster() + + mock_get_cluster.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_called_once_with("Checking if the cluster %s exists already...", TEST_CLUSTER_ID) + + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_cluster(self, mock_hook, mock_log, mock_to_dict): + mock_get_cluster = mock_hook.return_value.get_cluster + mock_cluster = mock_get_cluster.return_value + expected_result = mock_to_dict.return_value + + result = self.operator._get_cluster() + + mock_get_cluster.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the cluster %s exists already...", TEST_CLUSTER_ID), + call("AlloyDB cluster %s already exists.", TEST_CLUSTER_ID), + ] + ) + mock_to_dict.assert_called_once_with(mock_cluster) + assert result == expected_result + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) - def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("_get_cluster")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_cluster, mock_to_dict, mock_link + ): + mock_get_cluster.return_value = None mock_create_cluster = mock_hook.return_value.create_cluster mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster mock_operation = mock_create_cluster.return_value mock_operation_result = mock_get_operation_result.return_value - expected_message = "Creating an AlloyDB cluster." expected_result = mock_to_dict.return_value mock_context = mock.MagicMock() result = self.operator.execute(context=mock_context) - mock_log.info.assert_called_once_with(expected_message) + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB cluster.") + mock_get_cluster.assert_called_once() mock_create_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, cluster=TEST_CLUSTER, @@ -211,36 +299,40 @@ def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_d assert not mock_create_secondary_cluster.called mock_to_dict.assert_called_once_with(mock_operation_result) mock_get_operation_result.assert_called_once_with(mock_operation) - mock_link.persist.assert_called_once_with( - context=mock_context, - task_instance=self.operator, - location_id=TEST_GCP_REGION, - cluster_id=TEST_CLUSTER_ID, - project_id=TEST_GCP_PROJECT, - ) + assert result == expected_result @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("_get_cluster")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_is_secondary( - self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + self, mock_hook, mock_log, mock_get_operation_result, mock_get_cluster, mock_to_dict, mock_link ): + mock_get_cluster.return_value = None mock_create_cluster = mock_hook.return_value.create_cluster mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster mock_operation = mock_create_secondary_cluster.return_value mock_operation_result = mock_get_operation_result.return_value - expected_message = "Creating an AlloyDB cluster." expected_result = mock_to_dict.return_value mock_context = mock.MagicMock() self.operator.is_secondary = True result = self.operator.execute(context=mock_context) - mock_log.info.assert_called_once_with(expected_message) + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB cluster.") + mock_get_cluster.assert_called_once() assert not mock_create_cluster.called mock_create_secondary_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, @@ -255,35 +347,39 @@ def test_execute_is_secondary( ) mock_to_dict.assert_called_once_with(mock_operation_result) mock_get_operation_result.assert_called_once_with(mock_operation) - mock_link.persist.assert_called_once_with( - context=mock_context, - task_instance=self.operator, - location_id=TEST_GCP_REGION, - cluster_id=TEST_CLUSTER_ID, - project_id=TEST_GCP_PROJECT, - ) + assert result == expected_result @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("_get_cluster")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_validate_request( - self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + self, mock_hook, mock_log, mock_get_operation_result, mock_get_cluster, mock_to_dict, mock_link ): + mock_get_cluster.return_value = None mock_create_cluster = mock_hook.return_value.create_cluster mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster mock_operation = mock_create_cluster.return_value mock_get_operation_result.return_value = None - expected_message = "Validating a Create AlloyDB cluster request." mock_context = mock.MagicMock() self.operator.validate_request = True result = self.operator.execute(context=mock_context) - mock_log.info.assert_called_once_with(expected_message) + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Validating a Create AlloyDB cluster request.") + mock_get_cluster.assert_called_once() mock_create_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, cluster=TEST_CLUSTER, @@ -297,31 +393,40 @@ def test_execute_validate_request( ) assert not mock_create_secondary_cluster.called assert not mock_to_dict.called - assert not mock_link.persist.called mock_get_operation_result.assert_called_once_with(mock_operation) assert result is None @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("_get_cluster")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_validate_request_is_secondary( - self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + self, mock_hook, mock_log, mock_get_operation_result, mock_get_cluster, mock_to_dict, mock_link ): + mock_get_cluster.return_value = None mock_create_cluster = mock_hook.return_value.create_cluster mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster mock_operation = mock_create_secondary_cluster.return_value mock_get_operation_result.return_value = None - expected_message = "Validating a Create AlloyDB cluster request." mock_context = mock.MagicMock() self.operator.validate_request = True self.operator.is_secondary = True result = self.operator.execute(context=mock_context) - mock_log.info.assert_called_once_with(expected_message) + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Validating a Create AlloyDB cluster request.") + mock_get_cluster.assert_called_once() mock_create_secondary_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, cluster=TEST_CLUSTER, @@ -335,55 +440,25 @@ def test_execute_validate_request_is_secondary( ) assert not mock_create_cluster.called assert not mock_to_dict.called - assert not mock_link.persist.called mock_get_operation_result.assert_called_once_with(mock_operation) assert result is None @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) - @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("_get_cluster")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_already_exists( - self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + self, mock_hook, mock_log, mock_get_operation_result, mock_get_cluster, mock_link ): + expected_result = mock_get_cluster.return_value mock_create_cluster = mock_hook.return_value.create_cluster - mock_create_cluster.side_effect = AlreadyExists("test-message") - mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster - mock_get_cluster = mock_hook.return_value.get_cluster - mock_get_cluster_result = mock_get_cluster.return_value - expected_result = mock_to_dict.return_value mock_context = mock.MagicMock() result = self.operator.execute(context=mock_context) - mock_log.info.assert_has_calls( - [ - call("Creating an AlloyDB cluster."), - call("AlloyDB cluster %s already exists.", TEST_CLUSTER_ID), - ] - ) - mock_create_cluster.assert_called_once_with( - cluster_id=TEST_CLUSTER_ID, - cluster=TEST_CLUSTER, - location=TEST_GCP_REGION, - project_id=TEST_GCP_PROJECT, - request_id=TEST_REQUEST_ID, - validate_only=TEST_VALIDATE_ONLY, - retry=TEST_RETRY, - timeout=TEST_TIMEOUT, - metadata=TEST_METADATA, - ) - assert not mock_create_secondary_cluster.called - mock_get_cluster.assert_called_once_with( - cluster_id=TEST_CLUSTER_ID, - location=TEST_GCP_REGION, - project_id=TEST_GCP_PROJECT, - ) - mock_to_dict.assert_called_once_with(mock_get_cluster_result) - assert not mock_get_operation_result.called mock_link.persist.assert_called_once_with( context=mock_context, task_instance=self.operator, @@ -391,121 +466,42 @@ def test_execute_already_exists( cluster_id=TEST_CLUSTER_ID, project_id=TEST_GCP_PROJECT, ) - assert result == expected_result - - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) - @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) - def test_execute_invalid_argument( - self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link - ): - mock_create_cluster = mock_hook.return_value.create_cluster - expected_error_message = "cannot create more than one secondary cluster per primary cluster" - mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster - mock_create_secondary_cluster.side_effect = InvalidArgument(message=expected_error_message) - - mock_get_cluster = mock_hook.return_value.get_cluster - mock_get_cluster_result = mock_get_cluster.return_value - - expected_result = mock_to_dict.return_value - expected_result.get.return_value = TEST_CLUSTER_NAME - mock_context = mock.MagicMock() - self.operator.is_secondary = True - result = self.operator.execute(context=mock_context) - - mock_log.info.assert_has_calls( - [ - call("Creating an AlloyDB cluster."), - call("AlloyDB cluster %s already exists.", TEST_CLUSTER_ID), - ] - ) - mock_create_secondary_cluster.assert_called_once_with( - cluster_id=TEST_CLUSTER_ID, - cluster=TEST_CLUSTER, - location=TEST_GCP_REGION, - project_id=TEST_GCP_PROJECT, - request_id=TEST_REQUEST_ID, - validate_only=TEST_VALIDATE_ONLY, - retry=TEST_RETRY, - timeout=TEST_TIMEOUT, - metadata=TEST_METADATA, - ) + assert not mock_log.info.called + mock_get_cluster.assert_called_once() assert not mock_create_cluster.called - mock_get_cluster.assert_called_once_with( - cluster_id=TEST_CLUSTER_ID, - location=TEST_GCP_REGION, - project_id=TEST_GCP_PROJECT, - ) - mock_to_dict.assert_called_once_with(mock_get_cluster_result) + assert not mock_create_secondary_cluster.called assert not mock_get_operation_result.called - mock_link.persist.assert_called_once_with( - context=mock_context, - task_instance=self.operator, - location_id=TEST_GCP_REGION, - cluster_id=TEST_CLUSTER_ID, - project_id=TEST_GCP_PROJECT, - ) assert result == expected_result @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) - def test_execute_invalid_argument_exception( - self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("_get_cluster")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_CLUSTER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_cluster, mock_to_dict, mock_link ): + mock_get_cluster.return_value = None mock_create_cluster = mock_hook.return_value.create_cluster mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster - mock_create_secondary_cluster.side_effect = InvalidArgument(message="Test error") - mock_get_cluster = mock_hook.return_value.get_cluster - expected_result = mock_to_dict.return_value - expected_result.get.return_value = TEST_CLUSTER_NAME + mock_create_cluster.side_effect = Exception() mock_context = mock.MagicMock() - self.operator.is_secondary = True with pytest.raises(AirflowException): self.operator.execute(context=mock_context) - mock_log.info.assert_called_once_with("Creating an AlloyDB cluster.") - mock_create_secondary_cluster.assert_called_once_with( + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, cluster_id=TEST_CLUSTER_ID, - cluster=TEST_CLUSTER, - location=TEST_GCP_REGION, project_id=TEST_GCP_PROJECT, - request_id=TEST_REQUEST_ID, - validate_only=TEST_VALIDATE_ONLY, - retry=TEST_RETRY, - timeout=TEST_TIMEOUT, - metadata=TEST_METADATA, ) - assert not mock_create_cluster.called - assert not mock_get_cluster.called - assert not mock_to_dict.called - assert not mock_get_operation_result.called - assert not mock_link.persist.called - - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) - @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.get_operation_result")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBCreateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) - def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): - mock_create_cluster = mock_hook.return_value.create_cluster - mock_create_secondary_cluster = mock_hook.return_value.create_secondary_cluster - mock_create_cluster.side_effect = Exception() - mock_get_cluster = mock_hook.return_value.get_cluster - expected_result = mock_to_dict.return_value - expected_result.get.return_value = TEST_CLUSTER_NAME - mock_context = mock.MagicMock() - - with pytest.raises(AirflowException): - self.operator.execute(context=mock_context) mock_log.info.assert_called_once_with("Creating an AlloyDB cluster.") + mock_get_cluster.assert_called_once() mock_create_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, cluster=TEST_CLUSTER, @@ -518,10 +514,8 @@ def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, metadata=TEST_METADATA, ) assert not mock_create_secondary_cluster.called - assert not mock_get_cluster.called assert not mock_to_dict.called assert not mock_get_operation_result.called - assert not mock_link.persist.called class TestAlloyDBUpdateClusterOperator: @@ -550,7 +544,7 @@ def test_init(self): assert self.operator.allow_missing == TEST_ALLOW_MISSING def test_template_fields(self): - expected_template_fields = {"cluster_id", "allow_missing"} | set( + expected_template_fields = {"cluster_id", "cluster_configuration", "allow_missing"} | set( AlloyDBWriteBaseOperator.template_fields ) assert set(AlloyDBUpdateClusterOperator.template_fields) == expected_template_fields @@ -559,7 +553,7 @@ def test_template_fields(self): @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.get_operation_result")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): mock_update_cluster = mock_hook.return_value.update_cluster mock_operation = mock_update_cluster.return_value @@ -570,6 +564,13 @@ def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_d result = self.operator.execute(context=mock_context) + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) mock_update_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, project_id=TEST_GCP_PROJECT, @@ -585,13 +586,6 @@ def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_d ) mock_get_operation_result.assert_called_once_with(mock_operation) mock_to_dict.assert_called_once_with(mock_operation_result) - mock_link.persist.assert_called_once_with( - context=mock_context, - task_instance=self.operator, - location_id=TEST_GCP_REGION, - cluster_id=TEST_CLUSTER_ID, - project_id=TEST_GCP_PROJECT, - ) assert result == expected_result mock_log.info.assert_has_calls( [ @@ -604,7 +598,7 @@ def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_d @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.get_operation_result")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_validate_request( self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link ): @@ -647,7 +641,7 @@ def test_execute_validate_request( @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.get_operation_result")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUpdateClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): mock_update_cluster = mock_hook.return_value.update_cluster mock_update_cluster.side_effect = Exception @@ -657,6 +651,13 @@ def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, with pytest.raises(AirflowException): self.operator.execute(context=mock_context) + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) mock_update_cluster.assert_called_once_with( cluster_id=TEST_CLUSTER_ID, project_id=TEST_GCP_PROJECT, @@ -672,7 +673,6 @@ def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, ) assert not mock_get_operation_result.called assert not mock_to_dict.called - assert not mock_link.persist.called mock_log.info.assert_called_once_with("Updating an AlloyDB cluster.") @@ -707,7 +707,7 @@ def test_template_fields(self): @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.get_operation_result")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute(self, mock_hook, mock_log, mock_get_operation_result): mock_delete_cluster = mock_hook.return_value.delete_cluster mock_operation = mock_delete_cluster.return_value @@ -738,7 +738,7 @@ def test_execute(self, mock_hook, mock_log, mock_get_operation_result): @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.get_operation_result")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_validate_request(self, mock_hook, mock_log, mock_get_operation_result): mock_delete_cluster = mock_hook.return_value.delete_cluster mock_operation = mock_delete_cluster.return_value @@ -765,7 +765,7 @@ def test_execute_validate_request(self, mock_hook, mock_log, mock_get_operation_ @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.get_operation_result")) @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBDeleteClusterOperator.log")) - @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDbHook"), new_callable=mock.PropertyMock) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result): mock_delete_cluster = mock_hook.return_value.delete_cluster mock_delete_cluster.side_effect = Exception @@ -788,3 +788,654 @@ def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result) ) assert not mock_get_operation_result.called mock_log.info.assert_called_once_with("Deleting an AlloyDB cluster.") + + +class TestAlloyDBCreateInstanceOperator: + def setup_method(self): + self.operator = AlloyDBCreateInstanceOperator( + task_id=TEST_TASK_ID, + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance_configuration=TEST_INSTANCE, + is_secondary=TEST_IS_SECONDARY, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.instance_id == TEST_INSTANCE_ID + assert self.operator.cluster_id == TEST_CLUSTER_ID + assert self.operator.instance_configuration == TEST_INSTANCE + assert self.operator.is_secondary == TEST_IS_SECONDARY + + def test_template_fields(self): + expected_template_fields = { + "cluster_id", + "instance_id", + "is_secondary", + "instance_configuration", + } | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBCreateInstanceOperator.template_fields) == expected_template_fields + + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_instance_not_found(self, mock_hook, mock_log): + mock_get_instance = mock_hook.return_value.get_instance + mock_get_instance.side_effect = NotFound("Not found") + + result = self.operator._get_instance() + + mock_get_instance.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + instance_id=TEST_INSTANCE_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the instance %s exists already...", TEST_INSTANCE_ID), + call("The instance %s does not exist yet.", TEST_INSTANCE_ID), + ] + ) + assert result is None + + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_instance_exception(self, mock_hook, mock_log): + mock_get_instance = mock_hook.return_value.get_instance + mock_get_instance.side_effect = Exception("Test exception") + + with pytest.raises(AirflowException): + self.operator._get_instance() + + mock_get_instance.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + instance_id=TEST_INSTANCE_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_called_once_with( + "Checking if the instance %s exists already...", TEST_INSTANCE_ID + ) + + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_instance(self, mock_hook, mock_log, mock_to_dict): + mock_get_instance = mock_hook.return_value.get_instance + mock_instance = mock_get_instance.return_value + expected_result = mock_to_dict.return_value + + result = self.operator._get_instance() + + mock_get_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the instance %s exists already...", TEST_INSTANCE_ID), + call( + "AlloyDB instance %s already exists in the cluster %s.", TEST_CLUSTER_ID, TEST_INSTANCE_ID + ), + ] + ) + mock_to_dict.assert_called_once_with(mock_instance) + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("_get_instance")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_instance, mock_to_dict, mock_link + ): + mock_get_instance.return_value = None + mock_create_instance = mock_hook.return_value.create_instance + mock_create_secondary_instance = mock_hook.return_value.create_secondary_instance + mock_operation = mock_create_instance.return_value + mock_operation_result = mock_get_operation_result.return_value + + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB instance.") + mock_get_instance.assert_called_once() + mock_create_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance=TEST_INSTANCE, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_create_secondary_instance.called + mock_to_dict.assert_called_once_with(mock_operation_result) + mock_get_operation_result.assert_called_once_with(mock_operation) + + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("_get_instance")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_is_secondary( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_instance, mock_to_dict, mock_link + ): + mock_get_instance.return_value = None + mock_create_instance = mock_hook.return_value.create_instance + mock_create_secondary_instance = mock_hook.return_value.create_secondary_instance + mock_operation = mock_create_secondary_instance.return_value + mock_operation_result = mock_get_operation_result.return_value + + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + self.operator.is_secondary = True + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB instance.") + mock_get_instance.assert_called_once() + assert not mock_create_instance.called + mock_create_secondary_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance=TEST_INSTANCE, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_to_dict.assert_called_once_with(mock_operation_result) + mock_get_operation_result.assert_called_once_with(mock_operation) + + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("_get_instance")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_instance, mock_to_dict, mock_link + ): + mock_get_instance.return_value = None + mock_create_instance = mock_hook.return_value.create_instance + mock_create_secondary_instance = mock_hook.return_value.create_secondary_instance + mock_operation = mock_create_instance.return_value + mock_get_operation_result.return_value = None + + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Validating a Create AlloyDB instance request.") + mock_get_instance.assert_called_once() + mock_create_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance=TEST_INSTANCE, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_create_secondary_instance.called + assert not mock_to_dict.called + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("_get_instance")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request_is_secondary( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_instance, mock_to_dict, mock_link + ): + mock_get_instance.return_value = None + mock_create_instance = mock_hook.return_value.create_instance + mock_create_secondary_instance = mock_hook.return_value.create_secondary_instance + mock_operation = mock_create_secondary_instance.return_value + mock_get_operation_result.return_value = None + + mock_context = mock.MagicMock() + self.operator.validate_request = True + self.operator.is_secondary = True + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Validating a Create AlloyDB instance request.") + mock_get_instance.assert_called_once() + mock_create_secondary_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance=TEST_INSTANCE, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_create_instance.called + assert not mock_to_dict.called + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("_get_instance")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_already_exists( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_instance, mock_link + ): + expected_result = mock_get_instance.return_value + mock_create_instance = mock_hook.return_value.create_instance + mock_create_secondary_instance = mock_hook.return_value.create_secondary_instance + + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + assert not mock_log.info.called + mock_get_instance.assert_called_once() + assert not mock_create_instance.called + assert not mock_create_secondary_instance.called + assert not mock_get_operation_result.called + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("_get_instance")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_instance, mock_to_dict, mock_link + ): + mock_get_instance.return_value = None + mock_create_instance = mock_hook.return_value.create_instance + mock_create_secondary_instance = mock_hook.return_value.create_secondary_instance + mock_create_instance.side_effect = Exception() + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB instance.") + mock_get_instance.assert_called_once() + mock_create_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance=TEST_INSTANCE, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_create_secondary_instance.called + assert not mock_to_dict.called + assert not mock_get_operation_result.called + + +class TestAlloyDBUpdateInstanceOperator: + def setup_method(self): + self.operator = AlloyDBUpdateInstanceOperator( + task_id=TEST_TASK_ID, + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + instance_configuration=TEST_INSTANCE, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.instance_id == TEST_INSTANCE_ID + assert self.operator.cluster_id == TEST_CLUSTER_ID + assert self.operator.instance_configuration == TEST_INSTANCE + assert self.operator.update_mask == TEST_UPDATE_MASK + assert self.operator.allow_missing == TEST_ALLOW_MISSING + + def test_template_fields(self): + expected_template_fields = { + "cluster_id", + "instance_id", + "instance_configuration", + "update_mask", + "allow_missing", + } | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBUpdateInstanceOperator.template_fields) == expected_template_fields + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Instance.to_dict")) + @mock.patch(UPDATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(UPDATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): + mock_update_instance = mock_hook.return_value.update_instance + mock_operation = mock_update_instance.return_value + mock_operation_result = mock_get_operation_result.return_value + + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + mock_update_instance.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + instance_id=TEST_INSTANCE_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + instance=TEST_INSTANCE, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + mock_to_dict.assert_called_once_with(mock_operation_result) + assert result == expected_result + mock_log.info.assert_has_calls( + [ + call("Updating an AlloyDB instance."), + call("AlloyDB instance %s was successfully updated.", TEST_CLUSTER_ID), + ] + ) + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) + @mock.patch(UPDATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(UPDATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request( + self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + ): + mock_update_instance = mock_hook.return_value.update_instance + mock_operation = mock_update_instance.return_value + mock_get_operation_result.return_value = None + + expected_message = "Validating an Update AlloyDB instance request." + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_log.info.assert_called_once_with(expected_message) + mock_update_instance.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + instance_id=TEST_INSTANCE_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + instance=TEST_INSTANCE, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + assert not mock_to_dict.called + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBClusterLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Cluster.to_dict")) + @mock.patch(UPDATE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(UPDATE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): + mock_update_instance = mock_hook.return_value.update_instance + mock_update_instance.side_effect = Exception + + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + mock_update_instance.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + instance_id=TEST_INSTANCE_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + instance=TEST_INSTANCE, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_get_operation_result.called + assert not mock_to_dict.called + mock_log.info.assert_called_once_with("Updating an AlloyDB instance.") + + +class TestAlloyDBDeleteInstanceOperator: + def setup_method(self): + self.operator = AlloyDBDeleteInstanceOperator( + task_id=TEST_TASK_ID, + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + etag=TEST_ETAG, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.cluster_id == TEST_CLUSTER_ID + assert self.operator.instance_id == TEST_INSTANCE_ID + assert self.operator.etag == TEST_ETAG + + def test_template_fields(self): + expected_template_fields = {"cluster_id", "instance_id", "etag"} | set( + AlloyDBWriteBaseOperator.template_fields + ) + assert set(AlloyDBDeleteInstanceOperator.template_fields) == expected_template_fields + + @mock.patch(DELETE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(DELETE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log, mock_get_operation_result): + mock_delete_instance = mock_hook.return_value.delete_instance + mock_operation = mock_delete_instance.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_delete_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + etag=TEST_ETAG, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + mock_log.info.assert_has_calls( + [ + call("Deleting an AlloyDB instance."), + call("AlloyDB instance %s was successfully removed.", TEST_INSTANCE_ID), + ] + ) + + @mock.patch(DELETE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(DELETE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request(self, mock_hook, mock_log, mock_get_operation_result): + mock_delete_instance = mock_hook.return_value.delete_instance + mock_operation = mock_delete_instance.return_value + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_delete_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + etag=TEST_ETAG, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + mock_log.info.assert_called_once_with("Validating a Delete AlloyDB instance request.") + + @mock.patch(DELETE_INSTANCE_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(DELETE_INSTANCE_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result): + mock_delete_instance = mock_hook.return_value.delete_instance + mock_delete_instance.side_effect = Exception + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + _ = self.operator.execute(context=mock_context) + + mock_delete_instance.assert_called_once_with( + instance_id=TEST_INSTANCE_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + etag=TEST_ETAG, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_get_operation_result.called + mock_log.info.assert_called_once_with("Deleting an AlloyDB instance.") diff --git a/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py b/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py index e8f40562f44b5..c9158d1e110bc 100644 --- a/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py +++ b/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py @@ -26,8 +26,11 @@ from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.alloy_db import ( AlloyDBCreateClusterOperator, + AlloyDBCreateInstanceOperator, AlloyDBDeleteClusterOperator, + AlloyDBDeleteInstanceOperator, AlloyDBUpdateClusterOperator, + AlloyDBUpdateInstanceOperator, ) from airflow.utils.trigger_rule import TriggerRule @@ -61,6 +64,16 @@ "primary_cluster_name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/clusters/{CLUSTER_ID}", }, } +INSTANCE_ID = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") +INSTANCE = { + "instance_type": "PRIMARY", +} +INSTANCE_UPDATE = {"labels": {"label_test": "test_value"}} +INSTANCE_UPDATE_MASK = {"paths": ["labels"]} +SECONDARY_INSTANCE = { + "instance_type": "SECONDARY", +} +SECONDARY_INSTANCE_ID = f"instance-secondary-{DAG_ID}-{ENV_ID}".replace("_", "-") with DAG( DAG_ID, @@ -91,6 +104,30 @@ ) # [END howto_operator_alloy_db_update_cluster] + # [START howto_operator_alloy_db_create_instance] + create_instance = AlloyDBCreateInstanceOperator( + task_id="create_instance", + cluster_id=CLUSTER_ID, + instance_id=INSTANCE_ID, + instance_configuration=INSTANCE, + is_secondary=False, + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + ) + # [END howto_operator_alloy_db_create_instance] + + # [START howto_operator_alloy_db_update_instance] + update_instance = AlloyDBUpdateInstanceOperator( + task_id="update_instance", + cluster_id=CLUSTER_ID, + instance_id=INSTANCE_ID, + instance_configuration=INSTANCE_UPDATE, + update_mask=INSTANCE_UPDATE_MASK, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_alloy_db_update_instance] + create_secondary_cluster = AlloyDBCreateClusterOperator( task_id="create_secondary_cluster", cluster_id=SECONDARY_CLUSTER_ID, @@ -100,12 +137,34 @@ project_id=GCP_PROJECT_ID, ) + create_secondary_instance = AlloyDBCreateInstanceOperator( + task_id="create_secondary_instance", + cluster_id=SECONDARY_CLUSTER_ID, + instance_id=SECONDARY_INSTANCE_ID, + instance_configuration=SECONDARY_INSTANCE, + is_secondary=True, + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION_SECONDARY, + ) + + # [START howto_operator_alloy_db_delete_instance] + delete_instance = AlloyDBDeleteInstanceOperator( + task_id="delete_instance", + cluster_id=CLUSTER_ID, + instance_id=INSTANCE_ID, + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + ) + # [END howto_operator_alloy_db_delete_instance] + delete_instance.trigger_rule = TriggerRule.ALL_DONE + delete_secondary_cluster = AlloyDBDeleteClusterOperator( task_id="delete_secondary_cluster", project_id=GCP_PROJECT_ID, location=GCP_LOCATION_SECONDARY, cluster_id=SECONDARY_CLUSTER_ID, trigger_rule=TriggerRule.ALL_DONE, + force=True, ) # [START howto_operator_alloy_db_delete_cluster] @@ -116,10 +175,19 @@ cluster_id=CLUSTER_ID, ) # [END howto_operator_alloy_db_delete_cluster] - delete_cluster.trigger_rule = TriggerRule.ALL_DONE - create_cluster >> update_cluster >> create_secondary_cluster >> delete_secondary_cluster >> delete_cluster + ( + create_cluster + >> update_cluster + >> create_instance + >> update_instance + >> create_secondary_cluster + >> create_secondary_instance + >> delete_secondary_cluster + >> delete_instance + >> delete_cluster + ) from tests_common.test_utils.watcher import watcher