From a7d0b9facaf203f5342e135307eaf2a6583685f3 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Wed, 24 Jul 2024 16:18:06 +0000 Subject: [PATCH] Small fixes for system tests --- .../cloud/cloud_batch/example_cloud_batch.py | 48 +++++++++---------- .../google/cloud/dataplex/example_dataplex.py | 13 ++--- .../google/cloud/gcs/example_gcs_acl.py | 4 +- .../example_kubernetes_engine.py | 13 +++-- .../example_kubernetes_engine_async.py | 9 ++-- .../vision/example_vision_annotate_image.py | 11 +++-- .../vision/example_vision_autogenerated.py | 13 +++-- .../cloud/vision/example_vision_explicit.py | 13 +++-- 8 files changed, 68 insertions(+), 56 deletions(-) diff --git a/tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py b/tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py index 7598b1c4239fd..58d852a286bc8 100644 --- a/tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py +++ b/tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py @@ -35,31 +35,27 @@ CloudBatchSubmitJobOperator, ) from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_cloud_batch" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +DAG_ID = "cloud_batch" +REGION = "us-central1" -region = "us-central1" -job_name_prefix = "batch-system-test-job" -job1_name = f"{job_name_prefix}1" -job2_name = f"{job_name_prefix}2" - -submit1_task_name = "submit-job1" -submit2_task_name = "submit-job2" - -delete1_task_name = "delete-job1" -delete2_task_name = "delete-job2" +job_name_prefix = "batch" +job1_name = f"{job_name_prefix}-{DAG_ID}-{ENV_ID}-1".replace("_", "-") +job2_name = f"{job_name_prefix}-{DAG_ID}-{ENV_ID}-2".replace("_", "-") list_jobs_task_name = "list-jobs" list_tasks_task_name = "list-tasks" -clean1_task_name = "clean-job1" -clean2_task_name = "clean-job2" - def _assert_jobs(ti): job_names = ti.xcom_pull(task_ids=[list_jobs_task_name], key="return_value") - job_names_str = job_names[0][0]["name"].split("/")[-1] + " " + job_names[0][1]["name"].split("/")[-1] + job_names_str = "" + if job_names and len(job_names) > 0: + for job in job_names[0]: + job_names_str += job["name"].split("/")[-1] + " " assert job1_name in job_names_str assert job2_name in job_names_str @@ -125,9 +121,9 @@ def _create_job(): ) as dag: # [START howto_operator_batch_submit_job] submit1 = CloudBatchSubmitJobOperator( - task_id=submit1_task_name, + task_id="submit-job1", project_id=PROJECT_ID, - region=region, + region=REGION, job_name=job1_name, job=_create_job(), dag=dag, @@ -137,9 +133,9 @@ def _create_job(): # [START howto_operator_batch_submit_job_deferrable_mode] submit2 = CloudBatchSubmitJobOperator( - task_id=submit2_task_name, + task_id="submit-job2", project_id=PROJECT_ID, - region=region, + region=REGION, job_name=job2_name, job=batch_v1.Job.to_dict(_create_job()), dag=dag, @@ -149,7 +145,7 @@ def _create_job(): # [START howto_operator_batch_list_tasks] list_tasks = CloudBatchListTasksOperator( - task_id=list_tasks_task_name, project_id=PROJECT_ID, region=region, job_name=job1_name, dag=dag + task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag ) # [END howto_operator_batch_list_tasks] @@ -159,9 +155,9 @@ def _create_job(): list_jobs = CloudBatchListJobsOperator( task_id=list_jobs_task_name, project_id=PROJECT_ID, - region=region, - limit=2, - filter=f"name:projects/{PROJECT_ID}/locations/{region}/jobs/{job_name_prefix}*", + region=REGION, + limit=10, + filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*", dag=dag, ) # [END howto_operator_batch_list_jobs] @@ -172,7 +168,7 @@ def _create_job(): delete_job1 = CloudBatchDeleteJobOperator( task_id="delete-job1", project_id=PROJECT_ID, - region=region, + region=REGION, job_name=job1_name, dag=dag, trigger_rule=TriggerRule.ALL_DONE, @@ -182,7 +178,7 @@ def _create_job(): delete_job2 = CloudBatchDeleteJobOperator( task_id="delete-job2", project_id=PROJECT_ID, - region=region, + region=REGION, job_name=job2_name, dag=dag, trigger_rule=TriggerRule.ALL_DONE, diff --git a/tests/system/providers/google/cloud/dataplex/example_dataplex.py b/tests/system/providers/google/cloud/dataplex/example_dataplex.py index 0de2df886a0b0..bac7e56ed5b1d 100644 --- a/tests/system/providers/google/cloud/dataplex/example_dataplex.py +++ b/tests/system/providers/google/cloud/dataplex/example_dataplex.py @@ -40,24 +40,25 @@ ) from airflow.providers.google.cloud.sensors.dataplex import DataplexTaskStateSensor from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "project_id") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -DAG_ID = "example_dataplex" +DAG_ID = "dataplex" -BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") SPARK_FILE_NAME = "spark_example_pi.py" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -LAKE_ID = f"test-lake-dataplex-{ENV_ID}" +LAKE_ID = f"lake-{DAG_ID}-{ENV_ID}".replace("_", "-") REGION = "us-central1" SERVICE_ACC = f"{PROJECT_ID}@appspot.gserviceaccount.com" SPARK_FILE_FULL_PATH = f"gs://{BUCKET_NAME}/{SPARK_FILE_NAME}" -DATAPLEX_TASK_ID = f"test-task-{ENV_ID}" +DATAPLEX_TASK_ID = f"task-{DAG_ID}-{ENV_ID}".replace("_", "-") TRIGGER_SPEC_TYPE = "ON_DEMAND" # [START howto_dataplex_configuration] diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_acl.py b/tests/system/providers/google/cloud/gcs/example_gcs_acl.py index a6471bcfb6117..0b550b22ba07c 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_acl.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_acl.py @@ -35,8 +35,9 @@ from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID +GCS_ACL_ENTITY = os.environ.get("SYSTEM_TESTS_GCS_ACL_ENTITY", "allUsers") DAG_ID = "gcs_acl" @@ -45,7 +46,6 @@ FILE_NAME = "example_upload.txt" UPLOAD_FILE_PATH = f"gcs/{FILE_NAME}" -GCS_ACL_ENTITY = "allUsers" GCS_ACL_BUCKET_ROLE = "OWNER" GCS_ACL_OBJECT_ROLE = "OWNER" diff --git a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py index c416b3862481f..031f8326ee99c 100644 --- a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py +++ b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py @@ -32,16 +32,19 @@ GKEStartPodOperator, ) from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") DAG_ID = "kubernetes_engine" -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -GCP_LOCATION = "europe-north1-a" -CLUSTER_NAME = f"gke-{ENV_ID}".replace("_", "-") +GCP_LOCATION = "europe-west1" +CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-") +CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-") +CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL # [START howto_operator_gcp_gke_create_cluster_definition] -CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1} +CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}} # [END howto_operator_gcp_gke_create_cluster_definition] diff --git a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py index 10e9c016a39ab..5e3f4ddbf7044 100644 --- a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py +++ b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py @@ -32,13 +32,16 @@ GKEStartPodOperator, ) from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") DAG_ID = "kubernetes_engine_async" -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID GCP_LOCATION = "europe-north1-a" -CLUSTER_NAME = f"gke-async-{ENV_ID}".replace("_", "-") +CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-") +CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-") +CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1} diff --git a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py index 32d37f8518ff3..1d6167c6866ee 100644 --- a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py +++ b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py @@ -45,14 +45,14 @@ # [END howto_operator_vision_enums_import] -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -DAG_ID = "example_gcp_vision_annotate_image" +DAG_ID = "gcp_vision_annotate_image" LOCATION = "europe-west1" -BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}" +BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}".replace("_", "-") FILE_NAME = "image1.jpg" GCP_VISION_ANNOTATE_IMAGE_URL = f"gs://{BUCKET_NAME}/{FILE_NAME}" @@ -80,7 +80,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "vision"], + tags=["example", "vision", "annotate_image"], ) as dag: create_bucket = GCSCreateBucketOperator( task_id="create_bucket", project_id=PROJECT_ID, bucket_name=BUCKET_NAME @@ -173,8 +173,10 @@ ) chain( + # TEST SETUP create_bucket, copy_single_file, + # TEST BODY annotate_image, annotate_image_result, detect_text, @@ -185,6 +187,7 @@ detect_labels_result, detect_safe_search, detect_safe_search_result, + # TEST TEARDOWN delete_bucket, ) diff --git a/tests/system/providers/google/cloud/vision/example_vision_autogenerated.py b/tests/system/providers/google/cloud/vision/example_vision_autogenerated.py index 05522baf9cc08..907386ceb295c 100644 --- a/tests/system/providers/google/cloud/vision/example_vision_autogenerated.py +++ b/tests/system/providers/google/cloud/vision/example_vision_autogenerated.py @@ -63,14 +63,14 @@ # [END howto_operator_vision_enums_import] -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -DAG_ID = "example_gcp_vision_autogenerated_id" +DAG_ID = "gcp_vision_autogenerated_id" LOCATION = "europe-west1" -BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}" +BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}".replace("_", "-") FILE_NAME = "image1.jpg" GCP_VISION_PRODUCT_SET_ID = "product_set_explicit_id" @@ -112,7 +112,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "vision"], + tags=["example", "vision", "autogenerated"], ) as dag: create_bucket = GCSCreateBucketOperator( task_id="create_bucket", project_id=PROJECT_ID, bucket_name=BUCKET_NAME @@ -121,7 +121,7 @@ copy_single_file = GCSToGCSOperator( task_id="copy_single_gcs_file", source_bucket=BUCKET_NAME_SRC, - source_object=[PATH_SRC], + source_object=PATH_SRC, destination_bucket=BUCKET_NAME, destination_object=FILE_NAME, ) @@ -248,8 +248,10 @@ ) chain( + # TEST SETUP create_bucket, copy_single_file, + # TEST BODY product_create, product_get, product_update, @@ -258,6 +260,7 @@ product_set_update, reference_image_create, add_product_to_product_set, + # TEST TEARDOWN reference_image_delete, remove_product_from_product_set, product_set_delete, diff --git a/tests/system/providers/google/cloud/vision/example_vision_explicit.py b/tests/system/providers/google/cloud/vision/example_vision_explicit.py index 6d9151ef9e61a..663be91259683 100644 --- a/tests/system/providers/google/cloud/vision/example_vision_explicit.py +++ b/tests/system/providers/google/cloud/vision/example_vision_explicit.py @@ -59,14 +59,14 @@ # [END howto_operator_vision_reference_image_import_2] -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -DAG_ID = "example_gcp_vision_explicit_id" +DAG_ID = "gcp_vision_explicit_id" LOCATION = "europe-west1" -BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}" +BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}".replace("_", "-") FILE_NAME = "image1.jpg" GCP_VISION_PRODUCT_SET_ID = f"product_set_explicit_id_{ENV_ID}" @@ -98,7 +98,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "vision"], + tags=["example", "vision", "explicit"], ) as dag: create_bucket = GCSCreateBucketOperator( task_id="create_bucket", project_id=PROJECT_ID, bucket_name=BUCKET_NAME @@ -107,7 +107,7 @@ copy_single_file = GCSToGCSOperator( task_id="copy_single_gcs_file", source_bucket=BUCKET_NAME_SRC, - source_object=[PATH_SRC], + source_object=PATH_SRC, destination_bucket=BUCKET_NAME, destination_object=FILE_NAME, ) @@ -257,8 +257,10 @@ ) chain( + # TEST SETUP create_bucket, copy_single_file, + # TEST BODY product_set_create_2, product_set_get_2, product_set_update_2, @@ -271,6 +273,7 @@ add_product_to_product_set_2, remove_product_from_product_set_2, reference_image_delete_2, + # TEST TEARDOWN product_delete_2, product_set_delete_2, delete_bucket,