Skip to content

Commit

Permalink
Fix hive_partition_sensor system test (apache#40023)
Browse files Browse the repository at this point in the history
  • Loading branch information
e-galan authored Jun 10, 2024
1 parent fc4fbb3 commit f7708ac
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
23 changes: 17 additions & 6 deletions airflow/providers/google/cloud/operators/dataproc_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def execute(self, context: Context) -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Creating Dataproc Metastore service: %s", self.project_id)
self.log.info("Creating Dataproc Metastore service: %s", self.service_id)
try:
operation = hook.create_service(
region=self.region,
Expand Down Expand Up @@ -548,13 +548,24 @@ def execute(self, context: Context) -> None:
class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
"""Delete a single service.
:param request: The request object. Request message for
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
:param gcp_conn_id:
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -589,7 +600,7 @@ def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Deleting Dataproc Metastore service: %s", self.project_id)
self.log.info("Deleting Dataproc Metastore service: %s", self.service_id)
operation = hook.delete_service(
region=self.region,
project_id=self.project_id,
Expand All @@ -599,7 +610,7 @@ def execute(self, context: Context):
metadata=self.metadata,
)
hook.wait_for_operation(self.timeout, operation)
self.log.info("Service %s deleted successfully", self.project_id)
self.log.info("Service %s deleted successfully", self.service_id)


class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that show how to check Hive partitions existence
using Dataproc Metastore Sensor.
Example Airflow DAG that shows how to check Hive partitions existence with Dataproc Metastore Sensor.
Note that Metastore service must be configured to use gRPC endpoints.
"""
Expand Down Expand Up @@ -47,7 +46,7 @@
DAG_ID = "hive_partition_sensor"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
REGION = "us-central1"
REGION = "europe-west1"
NETWORK = "default"

METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
Expand All @@ -60,7 +59,7 @@
"network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
}
METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}-{ENV_ID}".replace("_", "-")
DATAPROC_CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
Expand Down Expand Up @@ -133,7 +132,7 @@

@task(task_id="get_hive_warehouse_bucket_task")
def get_hive_warehouse_bucket(**kwargs):
"""Returns Hive Metastore Warehouse GCS bucket name."""
"""Return Hive Metastore Warehouse GCS bucket name."""
ti = kwargs["ti"]
metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service")
config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"]
Expand Down Expand Up @@ -216,19 +215,16 @@ def get_hive_warehouse_bucket(**kwargs):
trigger_rule=TriggerRule.ALL_DONE,
)

# TEST SETUP
(
# TEST SETUP
create_metastore_service
>> create_cluster
>> get_hive_warehouse_bucket_task
>> copy_source_data
>> create_external_table
>> create_partitioned_table
>> partition_data
)
(
create_metastore_service
# TEST BODY
>> partition_data
>> hive_partition_sensor
# TEST TEARDOWN
>> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket]
Expand Down

0 comments on commit f7708ac

Please sign in to comment.