diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py b/airflow/providers/google/cloud/operators/dataproc_metastore.py index 10056cdff5d99..996d31fad5512 100644 --- a/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -431,7 +431,7 @@ def execute(self, context: Context) -> dict: hook = DataprocMetastoreHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain ) - self.log.info("Creating Dataproc Metastore service: %s", self.project_id) + self.log.info("Creating Dataproc Metastore service: %s", self.service_id) try: operation = hook.create_service( region=self.region, @@ -548,13 +548,24 @@ def execute(self, context: Context) -> None: class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator): """Delete a single service. - :param request: The request object. Request message for - [DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService]. + :param region: Required. The ID of the Google Cloud region that the service belongs to. :param project_id: Required. The ID of the Google Cloud project that the service belongs to. + :param service_id: Required. The ID of the metastore service, which is used as the final component of + the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin + with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or + hyphens. :param retry: Designation of what errors, if any, should be retried. :param timeout: The timeout for this request. :param metadata: Strings which should be sent along with the request as metadata. - :param gcp_conn_id: + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). """ template_fields: Sequence[str] = ( @@ -589,7 +600,7 @@ def execute(self, context: Context): hook = DataprocMetastoreHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain ) - self.log.info("Deleting Dataproc Metastore service: %s", self.project_id) + self.log.info("Deleting Dataproc Metastore service: %s", self.service_id) operation = hook.delete_service( region=self.region, project_id=self.project_id, @@ -599,7 +610,7 @@ def execute(self, context: Context): metadata=self.metadata, ) hook.wait_for_operation(self.timeout, operation) - self.log.info("Service %s deleted successfully", self.project_id) + self.log.info("Service %s deleted successfully", self.service_id) class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator): diff --git a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py index 4058a5544f467..5c2be86bd8785 100644 --- a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py +++ b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py @@ -16,8 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -Example Airflow DAG that show how to check Hive partitions existence -using Dataproc Metastore Sensor. +Example Airflow DAG that shows how to check Hive partitions existence with Dataproc Metastore Sensor. Note that Metastore service must be configured to use gRPC endpoints. """ @@ -47,7 +46,7 @@ DAG_ID = "hive_partition_sensor" PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project") ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env") -REGION = "us-central1" +REGION = "europe-west1" NETWORK = "default" METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-") @@ -60,7 +59,7 @@ "network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}", } METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}" -DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-") +DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}-{ENV_ID}".replace("_", "-") DATAPROC_CLUSTER_CONFIG = { "master_config": { "num_instances": 1, @@ -133,7 +132,7 @@ @task(task_id="get_hive_warehouse_bucket_task") def get_hive_warehouse_bucket(**kwargs): - """Returns Hive Metastore Warehouse GCS bucket name.""" + """Return Hive Metastore Warehouse GCS bucket name.""" ti = kwargs["ti"] metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service") config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"] @@ -216,19 +215,16 @@ def get_hive_warehouse_bucket(**kwargs): trigger_rule=TriggerRule.ALL_DONE, ) - # TEST SETUP ( + # TEST SETUP create_metastore_service >> create_cluster >> get_hive_warehouse_bucket_task >> copy_source_data >> create_external_table >> create_partitioned_table - >> partition_data - ) - ( - create_metastore_service # TEST BODY + >> partition_data >> hive_partition_sensor # TEST TEARDOWN >> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket]