diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py index eb2fecedb4e4c..03634b58f645b 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py @@ -31,11 +31,6 @@ from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLForecastingTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -48,13 +43,12 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_auto_ml_operations" +DAG_ID = "vertex_ai_auto_ml_operations" REGION = "us-central1" FORECASTING_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}" MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -FORECAST_GCS_BUCKET_NAME = f"bucket_forecast_{DAG_ID}_{ENV_ID}".replace("_", "-") FORECAST_DATASET = { "display_name": f"forecast-dataset-{ENV_ID}", @@ -62,7 +56,9 @@ "metadata": ParseDict( { "input_config": { - "gcs_source": {"uri": [f"gs://{FORECAST_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]} + "gcs_source": { + "uri": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/forecast-dataset.csv"] + } } }, Value(), @@ -89,22 +85,6 @@ catchup=False, tags=["example", "vertex_ai", "auto_ml"], ) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=FORECAST_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=REGION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=FORECAST_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_forecast_dataset = CreateDatasetOperator( task_id="forecast_dataset", dataset=FORECAST_DATASET, @@ -157,23 +137,24 @@ project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", bucket_name=FORECAST_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE - ) ( # TEST SETUP - create_bucket - >> move_dataset_file - >> create_forecast_dataset + create_forecast_dataset # TEST BODY >> create_auto_ml_forecasting_training_job # TEST TEARDOWN >> delete_auto_ml_forecasting_training_job >> delete_forecast_dataset - >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py index 4de1f43dd8740..c26ea94325e76 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py @@ -30,11 +30,6 @@ from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLImageTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -48,13 +43,12 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_auto_ml_operations" +DAG_ID = "vertex_ai_auto_ml_operations" REGION = "us-central1" IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}" MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -IMAGE_GCS_BUCKET_NAME = f"bucket_image_{DAG_ID}_{ENV_ID}".replace("_", "-") IMAGE_DATASET = { "display_name": f"image-dataset-{ENV_ID}", @@ -64,7 +58,7 @@ IMAGE_DATA_CONFIG = [ { "import_schema_uri": schema.dataset.ioformat.image.single_label_classification, - "gcs_source": {"uris": [f"gs://{IMAGE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]}, + "gcs_source": {"uris": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/flowers-dataset.csv"]}, }, ] @@ -76,22 +70,6 @@ catchup=False, tags=["example", "vertex_ai", "auto_ml"], ) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=IMAGE_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=REGION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=IMAGE_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_image_dataset = CreateDatasetOperator( task_id="image_dataset", dataset=IMAGE_DATASET, @@ -143,27 +121,25 @@ project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=IMAGE_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) ( # TEST SETUP - [ - create_bucket >> move_dataset_file, - create_image_dataset, - ] + create_image_dataset >> import_image_dataset # TEST BODY >> create_auto_ml_image_training_job # TEST TEARDOWN >> delete_auto_ml_image_training_job >> delete_image_dataset - >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py index fcf67c5210fbc..618182b94edc2 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py @@ -50,6 +50,13 @@ ) # [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator] + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py index 8c828bd4f21ad..91260eccdea57 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py @@ -31,11 +31,6 @@ from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLTabularTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -48,13 +43,12 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_auto_ml_operations" +DAG_ID = "vertex_ai_auto_ml_operations" REGION = "us-central1" TABULAR_DISPLAY_NAME = f"auto-ml-tabular-{ENV_ID}" MODEL_DISPLAY_NAME = "adopted-prediction-model" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -TABULAR_GCS_BUCKET_NAME = f"bucket_tabular_{DAG_ID}_{ENV_ID}".replace("_", "-") TABULAR_DATASET = { "display_name": f"tabular-dataset-{ENV_ID}", @@ -62,7 +56,7 @@ "metadata": ParseDict( { "input_config": { - "gcs_source": {"uri": [f"gs://{TABULAR_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]} + "gcs_source": {"uri": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/tabular-dataset.csv"]} } }, Value(), @@ -91,22 +85,6 @@ catchup=False, tags=["example", "vertex_ai", "auto_ml"], ) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=TABULAR_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=REGION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=TABULAR_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_tabular_dataset = CreateDatasetOperator( task_id="tabular_dataset", dataset=TABULAR_DATASET, @@ -150,25 +128,23 @@ trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=TABULAR_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) - ( # TEST SETUP - create_bucket - >> move_dataset_file - >> create_tabular_dataset + create_tabular_dataset # TEST BODY >> create_auto_ml_tabular_training_job # TEST TEARDOWN >> delete_auto_ml_tabular_training_job >> delete_tabular_dataset - >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py index 9a7e3e95ccdee..b91a8cd96975e 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py @@ -30,11 +30,6 @@ from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLTextTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -48,13 +43,12 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_auto_ml_operations" +DAG_ID = "vertex_ai_auto_ml_operations" REGION = "us-central1" TEXT_DISPLAY_NAME = f"auto-ml-text-{ENV_ID}" MODEL_DISPLAY_NAME = f"auto-ml-text-model-{ENV_ID}" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -TEXT_GCS_BUCKET_NAME = f"bucket_text_{DAG_ID}_{ENV_ID}".replace("_", "-") TEXT_DATASET = { "display_name": f"text-dataset-{ENV_ID}", @@ -64,7 +58,7 @@ TEXT_DATA_CONFIG = [ { "import_schema_uri": schema.dataset.ioformat.text.single_label_classification, - "gcs_source": {"uris": [f"gs://{TEXT_GCS_BUCKET_NAME}/vertex-ai/text-dataset.csv"]}, + "gcs_source": {"uris": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/text-dataset.csv"]}, }, ] @@ -75,22 +69,6 @@ catchup=False, tags=["example", "vertex_ai", "auto_ml"], ) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=TEXT_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=REGION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=TEXT_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_text_dataset = CreateDatasetOperator( task_id="text_dataset", dataset=TEXT_DATASET, @@ -140,27 +118,24 @@ trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=TEXT_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) - ( # TEST SETUP - [ - create_bucket >> move_dataset_file, - create_text_dataset, - ] + create_text_dataset >> import_text_dataset # TEST BODY >> create_auto_ml_text_training_job # TEST TEARDOWN >> delete_auto_ml_text_training_job >> delete_text_dataset - >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py index 1cbf9a250b7b1..cde6bb183e777 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py @@ -30,11 +30,6 @@ from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLVideoTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -48,7 +43,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_auto_ml_operations" +DAG_ID = "vertex_ai_auto_ml_operations" REGION = "us-central1" VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}" MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}" @@ -64,7 +59,7 @@ VIDEO_DATA_CONFIG = [ { "import_schema_uri": schema.dataset.ioformat.video.classification, - "gcs_source": {"uris": [f"gs://{VIDEO_GCS_BUCKET_NAME}/vertex-ai/video-dataset.csv"]}, + "gcs_source": {"uris": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/video-dataset.csv"]}, }, ] @@ -75,22 +70,6 @@ catchup=False, tags=["example", "vertex_ai", "auto_ml"], ) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=VIDEO_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=REGION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=VIDEO_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_video_dataset = CreateDatasetOperator( task_id="video_dataset", dataset=VIDEO_DATASET, @@ -152,18 +131,9 @@ trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=VIDEO_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) - ( # TEST SETUP - [ - create_bucket >> move_dataset_file, - create_video_dataset, - ] + create_video_dataset >> import_video_dataset # TEST BODY >> create_auto_ml_video_training_job @@ -171,9 +141,15 @@ # TEST TEARDOWN >> delete_auto_ml_video_training_job >> delete_video_dataset - >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py index 1b8c2e182ccdf..3f2dfc60ec0c0 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py @@ -34,7 +34,6 @@ from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, ) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLForecastingTrainingJobOperator, @@ -53,7 +52,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_batch_prediction_operations" +DAG_ID = "vertex_ai_batch_prediction_operations" REGION = "us-central1" FORECAST_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}" @@ -62,7 +61,7 @@ JOB_DISPLAY_NAME = f"batch_prediction_job_test_{ENV_ID}" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") -DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv" +DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/datasets/forecast-dataset.csv" FORECAST_DATASET = { "display_name": f"forecast-dataset-{ENV_ID}", @@ -70,7 +69,7 @@ "metadata": ParseDict( { "input_config": { - "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]} + "gcs_source": {"uri": [f"gs://{RESOURCE_DATA_BUCKET}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]} } }, Value(), @@ -108,15 +107,6 @@ location=REGION, ) - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_forecast_dataset = CreateDatasetOperator( task_id="forecast_dataset", dataset=FORECAST_DATASET, @@ -227,7 +217,6 @@ ( # TEST SETUP create_bucket - >> move_dataset_file >> create_forecast_dataset >> create_auto_ml_forecasting_training_job # TEST BODY @@ -240,6 +229,13 @@ >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py index ff642ae953333..f039877f7134a 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py @@ -207,6 +207,13 @@ def TABULAR_DATASET(bucket_name): >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py index c90c1aac231ee..9b67a3480bec2 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py @@ -72,7 +72,12 @@ def TABULAR_DATASET(bucket_name): MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest" REPLICA_COUNT = 1 -LOCAL_TRAINING_SCRIPT_PATH = "california_housing_training_script.py" +# VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH should be set for Airflow which is running on distributed system. +# For example in Composer the correct path is `gcs/data/california_housing_training_script.py`. +# Because `gcs/data/` is shared folder for Airflow's workers. +LOCAL_TRAINING_SCRIPT_PATH = os.environ.get( + "VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH", "california_housing_training_script.py" +) with DAG( @@ -244,6 +249,14 @@ def TABULAR_DATASET(bucket_name): ) ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + from tests.system.utils import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py index c9fd96aee95b0..33105d273f159 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py @@ -209,6 +209,13 @@ def TABULAR_DATASET(bucket_name): >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py index 8ad202f292a81..77b69081a4734 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py @@ -34,7 +34,6 @@ from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, ) from airflow.providers.google.cloud.operators.vertex_ai.dataset import ( CreateDatasetOperator, @@ -49,7 +48,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_dataset_operations" +DAG_ID = "vertex_ai_dataset_operations" REGION = "us-central1" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" @@ -61,7 +60,9 @@ "metadata": ParseDict( { "input_config": { - "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]} + "gcs_source": { + "uri": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/forecast-dataset.csv"] + } } }, Value(), @@ -78,7 +79,7 @@ "metadata": ParseDict( { "input_config": { - "gcs_source": {"uri": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]} + "gcs_source": {"uri": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/tabular-dataset.csv"]} } }, Value(), @@ -100,8 +101,8 @@ "data_item_labels": { "test-labels-name": "test-labels-value", }, - "import_schema_uri": "image_classification_single_label_io_format_1.0.0.yaml", - "gcs_source": {"uris": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset-flowers.csv"]}, + "import_schema_uri": schema.dataset.ioformat.image.single_label_classification, + "gcs_source": {"uris": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/image-dataset-flowers.csv"]}, }, ] DATASET_TO_UPDATE = {"display_name": "test-name"} @@ -122,15 +123,6 @@ location=REGION, ) - move_datasets_files = GCSSynchronizeBucketsOperator( - task_id="move_datasets_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - # [START how_to_cloud_vertex_ai_create_dataset_operator] create_image_dataset_job = CreateDatasetOperator( task_id="image_dataset", @@ -262,7 +254,6 @@ ( # TEST SETUP create_bucket - >> move_datasets_files # TEST BODY >> [ create_time_series_dataset_job >> delete_time_series_dataset_job, @@ -276,6 +267,13 @@ >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py index 297470be68db4..8fa802b51744d 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py @@ -30,11 +30,6 @@ from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( CreateAutoMLImageTrainingJobOperator, DeleteAutoMLTrainingJobOperator, @@ -55,13 +50,12 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_endpoint_service_operations" +DAG_ID = "vertex_ai_endpoint_service_operations" REGION = "us-central1" IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}" MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}" RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") IMAGE_DATASET = { "display_name": f"image-dataset-{ENV_ID}", @@ -71,7 +65,7 @@ IMAGE_DATA_CONFIG = [ { "import_schema_uri": schema.dataset.ioformat.image.single_label_classification, - "gcs_source": {"uris": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]}, + "gcs_source": {"uris": [f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/flowers-dataset.csv"]}, }, ] @@ -88,22 +82,6 @@ render_template_as_native_obj=True, tags=["example", "vertex_ai", "endpoint_service"], ) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=REGION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="vertex-ai/datasets", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, - destination_object="vertex-ai", - recursive=True, - ) - create_image_dataset = CreateDatasetOperator( task_id="image_dataset", dataset=IMAGE_DATASET, @@ -209,18 +187,10 @@ project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) ( # TEST SETUP - [ - create_bucket >> move_dataset_file, - create_image_dataset, - ] + create_image_dataset >> import_image_dataset >> create_auto_ml_image_training_job # TEST BODY @@ -232,9 +202,15 @@ # TEST TEARDOWN >> delete_auto_ml_image_training_job >> delete_image_dataset - >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py index 524330f4dcf64..fdacfef2be915 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py @@ -35,7 +35,7 @@ ) PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_generative_model_dag" +DAG_ID = "vertex_ai_generative_model_dag" REGION = "us-central1" PROMPT = "In 10 words or less, why is Apache Airflow amazing?" CONTENTS = [PROMPT] diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py index d0cb1458430c3..913fff2b4e09b 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py @@ -40,7 +40,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_hyperparameter_tuning_job_operations" +DAG_ID = "vertex_ai_hyperparameter_tuning_job_operations" REGION = "us-central1" DISPLAY_NAME = f"hyperparameter-tuning-job-{ENV_ID}" @@ -179,6 +179,13 @@ >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py index f3f761586cc2f..b4f8522a579e5 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py @@ -49,6 +49,13 @@ ) # [END how_to_cloud_vertex_ai_list_custom_training_job_operator] + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py index 003f7fe70d822..4560d9f54f9f4 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py @@ -61,7 +61,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_model_service_operations" +DAG_ID = "vertex_ai_model_service_operations" REGION = "us-central1" TRAIN_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}" MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}" @@ -87,7 +87,12 @@ CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest" -LOCAL_TRAINING_SCRIPT_PATH = "california_housing_training_script.py" +# VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH should be set for Airflow which is running on distributed system. +# For example in Composer the correct path is `gcs/data/california_housing_training_script.py`. +# Because `gcs/data/` is shared folder for Airflow's workers. +LOCAL_TRAINING_SCRIPT_PATH = os.environ.get( + "VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH", "california_housing_training_script.py" +) MODEL_OUTPUT_CONFIG = { "artifact_destination": { @@ -323,6 +328,13 @@ >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py index baf5b498b7cf4..130effbb86c39 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py @@ -44,7 +44,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_vertex_ai_pipeline_job_operations" +DAG_ID = "vertex_ai_pipeline_job_operations" REGION = "us-central1" DISPLAY_NAME = f"pipeline-job-{ENV_ID}" @@ -159,6 +159,13 @@ >> delete_bucket ) + # ### Everything below this line is not part of example ### + # ### Just for system tests purpose ### + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402