From bb2bbc826978c9e05a5f130bcc01c6b3783d6790 Mon Sep 17 00:00:00 2001 From: "M. Olcay Tercanli" Date: Tue, 26 Mar 2024 09:23:48 +0000 Subject: [PATCH] Add deprecation warnings and raise exception for already deprecated ones --- .../providers/google/cloud/hooks/automl.py | 34 ++ .../hooks/vertex_ai/prediction_service.py | 91 +++++ .../google/cloud/operators/automl.py | 255 ++++++++++-- airflow/providers/google/provider.yaml | 1 + .../operators/cloud/automl.rst | 112 ++++-- pyproject.toml | 2 + .../run_provider_yaml_files_check.py | 2 + tests/always/test_project_structure.py | 2 + .../google/cloud/hooks/test_automl.py | 8 + .../vertex_ai/test_prediction_service.py | 99 +++++ .../google/cloud/operators/test_automl.py | 376 +++++++++++++++--- .../cloud/automl/example_automl_dataset.py | 17 - .../cloud/automl/example_automl_model.py | 26 -- 13 files changed, 864 insertions(+), 161 deletions(-) create mode 100644 airflow/providers/google/cloud/hooks/vertex_ai/prediction_service.py create mode 100644 tests/providers/google/cloud/hooks/vertex_ai/test_prediction_service.py diff --git a/airflow/providers/google/cloud/hooks/automl.py b/airflow/providers/google/cloud/hooks/automl.py index d519aca42657d..3edd4ab1dbfd9 100644 --- a/airflow/providers/google/cloud/hooks/automl.py +++ b/airflow/providers/google/cloud/hooks/automl.py @@ -640,3 +640,37 @@ def delete_dataset( metadata=metadata, ) return result + + @GoogleBaseHook.fallback_to_default_project_id + def get_dataset( + self, + dataset_id: str, + location: str, + project_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Dataset: + """ + Retrieve the dataset for the given dataset_id. + + :param dataset_id: ID of dataset to be retrieved. + :param location: The location of the project. + :param project_id: ID of the Google Cloud project where dataset is located if None then + default project_id is used. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + + :return: `google.cloud.automl_v1beta1.types.dataset.Dataset` instance. + """ + client = self.get_conn() + name = f"projects/{project_id}/locations/{location}/datasets/{dataset_id}" + return client.get_dataset( + request={"name": name}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) diff --git a/airflow/providers/google/cloud/hooks/vertex_ai/prediction_service.py b/airflow/providers/google/cloud/hooks/vertex_ai/prediction_service.py new file mode 100644 index 0000000000000..26a647516bd46 --- /dev/null +++ b/airflow/providers/google/cloud/hooks/vertex_ai/prediction_service.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Sequence + +from google.api_core.client_options import ClientOptions +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault +from google.cloud.aiplatform_v1 import PredictionServiceClient + +from airflow.providers.google.common.consts import CLIENT_INFO +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook + +if TYPE_CHECKING: + from google.api_core.retry import Retry + from google.cloud.aiplatform_v1.types import PredictResponse + + +class PredictionServiceHook(GoogleBaseHook): + """Hook for Google Cloud Vertex AI Prediction API.""" + + def get_prediction_service_client(self, region: str | None = None) -> PredictionServiceClient: + """ + Return PredictionServiceClient object. + + :param region: The ID of the Google Cloud region that the service belongs to. Default is None. + + :return: `google.cloud.aiplatform_v1.services.prediction_service.client.PredictionServiceClient` instance. + """ + if region and region != "global": + client_options = ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com:443") + else: + client_options = ClientOptions() + + return PredictionServiceClient( + credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options + ) + + @GoogleBaseHook.fallback_to_default_project_id + def predict( + self, + endpoint_id: str, + instances: list[str], + location: str, + project_id: str = PROVIDE_PROJECT_ID, + parameters: dict[str, str] | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> PredictResponse: + """ + Perform an online prediction and returns the prediction result in the response. + + :param endpoint_id: Name of the endpoint_id requested to serve the prediction. + :param instances: Required. The instances that are the input to the prediction call. A DeployedModel + may have an upper limit on the number of instances it supports per request, and when it is + exceeded the prediction call errors in case of AutoML Models, or, in case of customer created + Models, the behaviour is as documented by that Model. + :param parameters: Additional domain-specific parameters, any string must be up to 25000 characters long. + :param project_id: ID of the Google Cloud project where model is located if None then + default project_id is used. + :param location: The location of the project. + :param retry: A retry object used to retry requests. If `None` is specified, requests will not be + retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + """ + client = self.get_prediction_service_client(location) + endpoint = f"projects/{project_id}/locations/{location}/endpoints/{endpoint_id}" + return client.predict( + request={"endpoint": endpoint, "instances": instances, "parameters": parameters}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) diff --git a/airflow/providers/google/cloud/operators/automl.py b/airflow/providers/google/cloud/operators/automl.py index e1fc6b4cf6cb3..d5dbb3b920996 100644 --- a/airflow/providers/google/cloud/operators/automl.py +++ b/airflow/providers/google/cloud/operators/automl.py @@ -21,8 +21,10 @@ import ast import warnings +from functools import cached_property from typing import TYPE_CHECKING, Sequence, Tuple +from deprecated import deprecated from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.cloud.automl_v1beta1 import ( BatchPredictResult, @@ -33,8 +35,9 @@ TableSpec, ) -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook +from airflow.providers.google.cloud.hooks.vertex_ai.prediction_service import PredictionServiceHook from airflow.providers.google.cloud.links.automl import ( AutoMLDatasetLink, AutoMLDatasetListLink, @@ -53,12 +56,36 @@ MetaData = Sequence[Tuple[str, str]] +def _raise_exception_for_deprecated_operator( + deprecated_class_name: str, alternative_class_names: str | list[str] +): + if isinstance(alternative_class_names, str): + alternative_class_name_str = alternative_class_names + elif len(alternative_class_names) == 1: + alternative_class_name_str = alternative_class_names[0] + else: + alternative_class_name_str = ", ".join(f"`{cls_name}`" for cls_name in alternative_class_names[:-1]) + alternative_class_name_str += f" or `{alternative_class_names[-1]}`" + + raise AirflowException( + f"{deprecated_class_name} for text, image, and video prediction has been " + f"deprecated and no longer available. All the functionality of " + f"legacy AutoML Natural Language, Vision, Video Intelligence and Tables " + f"and new features are available on the Vertex AI platform. " + f"Please use {alternative_class_name_str} from Vertex AI." + ) + + class AutoMLTrainModelOperator(GoogleCloudBaseOperator): """ Creates Google Cloud AutoML model. - AutoMLTrainModelOperator for text prediction is deprecated. Please use - :class:`airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator` + AutoMLTrainModelOperator for tables, video intelligence, vision and natural language has been deprecated + and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTabularTrainingJobOperator`, + :class:`airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator`, + :class:`airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLImageTrainingJobOperator`, + :class:`airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator`, instead. .. seealso:: @@ -120,17 +147,16 @@ def __init__( self.impersonation_chain = impersonation_chain def execute(self, context: Context): - # Output warning if running not AutoML Translation prediction job + # Raise exception if running not AutoML Translation prediction job if "translation_model_metadata" not in self.model: - warnings.warn( - "AutoMLTrainModelOperator for text, image and video prediction is deprecated. " - "All the functionality of legacy " - "AutoML Natural Language, Vision and Video Intelligence and new features are available " - "on the Vertex AI platform. " - "Please use `CreateAutoMLTextTrainingJobOperator`, `CreateAutoMLImageTrainingJobOperator` or" - " `CreateAutoMLVideoTrainingJobOperator` from VertexAI.", - AirflowProviderDeprecationWarning, - stacklevel=3, + _raise_exception_for_deprecated_operator( + self.__class__.__name__, + [ + "CreateAutoMLTabularTrainingJobOperator", + "CreateAutoMLVideoTrainingJobOperator", + "CreateAutoMLImageTrainingJobOperator", + "CreateAutoMLTextTrainingJobOperator", + ], ) hook = CloudAutoMLHook( gcp_conn_id=self.gcp_conn_id, @@ -174,7 +200,8 @@ class AutoMLPredictOperator(GoogleCloudBaseOperator): :ref:`howto/operator:AutoMLPredictOperator` :param model_id: Name of the model requested to serve the batch prediction. - :param payload: Name od the model used for the prediction. + :param endpoint_id: Name of the endpoint used for the prediction. + :param payload: Name of the model used for the prediction. :param project_id: ID of the Google Cloud project where model is located if None then default project_id is used. :param location: The location of the project. @@ -206,10 +233,12 @@ class AutoMLPredictOperator(GoogleCloudBaseOperator): def __init__( self, *, - model_id: str, + model_id: str | None = None, + endpoint_id: str | None = None, location: str, payload: dict, operation_params: dict[str, str] | None = None, + instances: list[str] | None = None, project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, @@ -221,7 +250,9 @@ def __init__( super().__init__(**kwargs) self.model_id = model_id + self.endpoint_id = endpoint_id self.operation_params = operation_params # type: ignore + self.instances = instances self.location = location self.project_id = project_id self.metadata = metadata @@ -231,23 +262,69 @@ def __init__( self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain - def execute(self, context: Context): - hook = CloudAutoMLHook( - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, - ) - result = hook.predict( + @cached_property + def hook(self) -> CloudAutoMLHook | PredictionServiceHook: + if self.model_id: + return CloudAutoMLHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + else: # endpoint_id defined + return PredictionServiceHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + + def _check_model_type(self): + hook = self.hook + model = hook.get_model( model_id=self.model_id, - payload=self.payload, location=self.location, project_id=self.project_id, - params=self.operation_params, retry=self.retry, timeout=self.timeout, metadata=self.metadata, ) + if not hasattr(model, "translation_model_metadata"): + raise AirflowException( + "AutoMLPredictOperator for text, image, and video prediction has been deprecated. " + "Please use endpoint_id param instead of model_id param." + ) + + def execute(self, context: Context): + if self.model_id is None and self.endpoint_id is None: + raise AirflowException("You must specify model_id or endpoint_id!") + + if self.model_id: + self._check_model_type() + + hook = self.hook + if self.model_id: + result = hook.predict( + model_id=self.model_id, + payload=self.payload, + location=self.location, + project_id=self.project_id, + params=self.operation_params, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + else: # self.endpoint_id is defined + result = hook.predict( + endpoint_id=self.endpoint_id, + instances=self.instances, + payload=self.payload, + location=self.location, + project_id=self.project_id, + parameters=self.operation_params, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + project_id = self.project_id or hook.project_id - if project_id: + if project_id and self.model_id: AutoMLModelPredictLink.persist( context=context, task_instance=self, @@ -261,6 +338,14 @@ class AutoMLBatchPredictOperator(GoogleCloudBaseOperator): """ Perform a batch prediction on Google Cloud AutoML. + AutoMLBatchPredictOperator for tables, video intelligence, vision and natural language has been deprecated + and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.CreateBatchPredictionJobOperator`, + :class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.GetBatchPredictionJobOperator`, + :class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.ListBatchPredictionJobsOperator`, + :class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.DeleteBatchPredictionJobOperator`, + instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLBatchPredictOperator` @@ -341,6 +426,25 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, ) + model: Model = hook.get_model( + model_id=self.model_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + if not hasattr(model, "translation_model_metadata"): + _raise_exception_for_deprecated_operator( + self.__class__.__name__, + [ + "CreateBatchPredictionJobOperator", + "GetBatchPredictionJobOperator", + "ListBatchPredictionJobsOperator", + "DeleteBatchPredictionJobOperator", + ], + ) self.log.info("Fetch batch prediction.") operation = hook.batch_predict( model_id=self.model_id, @@ -371,6 +475,10 @@ class AutoMLCreateDatasetOperator(GoogleCloudBaseOperator): """ Creates a Google Cloud AutoML dataset. + AutoMLCreateDatasetOperator for tables, video intelligence, vision and natural language has been + deprecated and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.CreateDatasetOperator` instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLCreateDatasetOperator` @@ -430,6 +538,8 @@ def __init__( self.impersonation_chain = impersonation_chain def execute(self, context: Context): + if "translation_dataset_metadata" not in self.dataset: + _raise_exception_for_deprecated_operator(self.__class__.__name__, "CreateDatasetOperator") hook = CloudAutoMLHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, @@ -463,6 +573,10 @@ class AutoMLImportDataOperator(GoogleCloudBaseOperator): """ Imports data to a Google Cloud AutoML dataset. + AutoMLImportDataOperator for tables, video intelligence, vision and natural language has been deprecated + and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.ImportDataOperator` instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLImportDataOperator` @@ -530,6 +644,16 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, ) + dataset: Dataset = hook.get_dataset( + dataset_id=self.dataset_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if not hasattr(dataset, "translation_dataset_metadata"): + _raise_exception_for_deprecated_operator(self.__class__.__name__, "ImportDataOperator") self.log.info("Importing data to dataset...") operation = hook.import_data( dataset_id=self.dataset_id, @@ -662,10 +786,22 @@ def execute(self, context: Context): return result +@deprecated( + reason=( + "Class `AutoMLTablesUpdateDatasetOperator` has been deprecated and no longer available. " + "Please use `UpdateDatasetOperator` instead" + ), + category=AirflowProviderDeprecationWarning, + action="error", +) class AutoMLTablesUpdateDatasetOperator(GoogleCloudBaseOperator): """ Updates a dataset. + AutoMLTablesUpdateDatasetOperator has been deprecated and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.UpdateDatasetOperator` + instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLTablesUpdateDatasetOperator` @@ -753,6 +889,10 @@ class AutoMLGetModelOperator(GoogleCloudBaseOperator): """ Get Google Cloud AutoML model. + AutoMLGetModelOperator for tables, video intelligence, vision and natural language has been deprecated + and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.model_service.GetModelOperator` instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLGetModelOperator` @@ -823,6 +963,8 @@ def execute(self, context: Context): timeout=self.timeout, metadata=self.metadata, ) + if not hasattr(result, "translation_model_metadata"): + _raise_exception_for_deprecated_operator(self.__class__.__name__, "GetModelOperator") model = Model.to_dict(result) project_id = self.project_id or hook.project_id if project_id: @@ -840,6 +982,10 @@ class AutoMLDeleteModelOperator(GoogleCloudBaseOperator): """ Delete Google Cloud AutoML model. + AutoMLDeleteModelOperator for tables, video intelligence, vision and natural language has been deprecated + and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.model_service.DeleteModelOperator` instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLDeleteModelOperator` @@ -901,6 +1047,16 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, ) + model: Model = hook.get_model( + model_id=self.model_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if not hasattr(model, "translation_model_metadata"): + _raise_exception_for_deprecated_operator(self.__class__.__name__, "DeleteModelOperator") operation = hook.delete_model( model_id=self.model_id, location=self.location, @@ -913,6 +1069,14 @@ def execute(self, context: Context): self.log.info("Deletion is completed") +@deprecated( + reason=( + "Class `AutoMLDeployModelOperator` has been deprecated and no longer available. Please use " + "`DeployModelOperator` instead" + ), + category=AirflowProviderDeprecationWarning, + action="error", +) class AutoMLDeployModelOperator(GoogleCloudBaseOperator): """ Deploys a model; if a model is already deployed, deploying it with the same parameters has no effect. @@ -923,6 +1087,10 @@ class AutoMLDeployModelOperator(GoogleCloudBaseOperator): Only applicable for Text Classification, Image Object Detection and Tables; all other domains manage deployment automatically. + AutoMLDeployModelOperator has been deprecated and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.DeployModelOperator` + instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLDeployModelOperator` @@ -989,6 +1157,16 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, ) + model = hook.get_model( + model_id=self.model_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if not hasattr(model, "translation_model_metadata"): + _raise_exception_for_deprecated_operator(self.__class__.__name__, "DeployModelOperator") self.log.info("Deploying model_id %s", self.model_id) operation = hook.deploy_model( @@ -1108,6 +1286,10 @@ class AutoMLListDatasetOperator(GoogleCloudBaseOperator): """ Lists AutoML Datasets in project. + AutoMLListDatasetOperator for tables, video intelligence, vision and natural language has been deprecated + and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.ListDatasetsOperator` instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLListDatasetOperator` @@ -1172,7 +1354,16 @@ def execute(self, context: Context): timeout=self.timeout, metadata=self.metadata, ) - result = [Dataset.to_dict(dataset) for dataset in page_iterator] + result = [] + for dataset in page_iterator: + if not hasattr(dataset, "translation_dataset_metadata"): + warnings.warn( + "Class `AutoMLListDatasetOperator` has been deprecated and no longer available. " + "Please use `ListDatasetsOperator` instead.", + stacklevel=2, + ) + else: + result.append(Dataset.to_dict(dataset)) self.log.info("Datasets obtained.") self.xcom_push( @@ -1190,6 +1381,10 @@ class AutoMLDeleteDatasetOperator(GoogleCloudBaseOperator): """ Deletes a dataset and all of its contents. + AutoMLDeleteDatasetOperator for tables, video intelligence, vision and natural language has been + deprecated and no longer available. Please use + :class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.DeleteDatasetOperator` instead. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AutoMLDeleteDatasetOperator` @@ -1260,6 +1455,16 @@ def execute(self, context: Context): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, ) + dataset: Dataset = hook.get_dataset( + dataset_id=self.dataset_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if not hasattr(dataset, "translation_dataset_metadata"): + _raise_exception_for_deprecated_operator(self.__class__.__name__, "DeleteDatasetOperator") dataset_id_list = self._parse_dataset_id(self.dataset_id) for dataset_id in dataset_id_list: self.log.info("Deleting dataset %s", dataset_id) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index f7ff6f39e24fe..ea5cb100e01c8 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -944,6 +944,7 @@ hooks: - airflow.providers.google.cloud.hooks.vertex_ai.model_service - airflow.providers.google.cloud.hooks.vertex_ai.pipeline_job - airflow.providers.google.cloud.hooks.vertex_ai.generative_model + - airflow.providers.google.cloud.hooks.vertex_ai.prediction_service - integration-name: Google Looker python-modules: - airflow.providers.google.cloud.hooks.looker diff --git a/docs/apache-airflow-providers-google/operators/cloud/automl.rst b/docs/apache-airflow-providers-google/operators/cloud/automl.rst index 34324ea60d914..5858b3a7c4cfb 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/automl.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/automl.rst @@ -41,6 +41,11 @@ To create a Google AutoML dataset you can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLCreateDatasetOperator`. The operator returns dataset id in :ref:`XCom ` under ``dataset_id`` key. +This operator is deprecated when running for text, video and vision prediction and will be removed soon. +All the functionality of legacy AutoML Natural Language, Vision, Video Intelligence and new features are +available on the Vertex AI platform. Please use +:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.CreateDatasetOperator` + .. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_dataset.py :language: python :dedent: 4 @@ -59,11 +64,16 @@ After creating a dataset you can use it to import some data using To update dataset you can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLTablesUpdateDatasetOperator`. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_dataset.py +This operator is deprecated when running for text, video and vision prediction and will be removed soon. +All the functionality of legacy AutoML Natural Language, Vision, Video Intelligence and new features are +available on the Vertex AI platform. Please use +:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.UpdateDatasetOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py :language: python :dedent: 4 - :start-after: [START howto_operator_automl_update_dataset] - :end-before: [END howto_operator_automl_update_dataset] + :start-after: [START how_to_cloud_vertex_ai_update_dataset_operator] + :end-before: [END how_to_cloud_vertex_ai_update_dataset_operator] .. _howto/operator:AutoMLTablesListTableSpecsOperator: .. _howto/operator:AutoMLTablesListColumnSpecsOperator: @@ -74,20 +84,10 @@ Listing Table And Columns Specs To list table specs you can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLTablesListTableSpecsOperator`. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_dataset.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_automl_specs] - :end-before: [END howto_operator_automl_specs] - To list column specs you can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLTablesListColumnSpecsOperator`. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_dataset.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_automl_column_specs] - :end-before: [END howto_operator_automl_column_specs] +AutoML Tables related operators are deprecated. Please use related Vertex AI Tabular operators. .. _howto/operator:AutoMLTrainModelOperator: .. _howto/operator:AutoMLGetModelOperator: @@ -102,7 +102,7 @@ To create a Google AutoML model you can use The operator will wait for the operation to complete. Additionally the operator returns the id of model in :ref:`XCom ` under ``model_id`` key. -This Operator is deprecated when running for text, video and vision prediction and will be removed soon. +This operator is deprecated when running for text, video and vision prediction and will be removed soon. All the functionality of legacy AutoML Natural Language, Vision, Video Intelligence and new features are available on the Vertex AI platform. Please use :class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator`, @@ -148,29 +148,44 @@ and To get existing model one can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLGetModelOperator`. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_model.py +This operator deprecated for tables, video intelligence, vision and natural language is deprecated +and will be removed after 31.03.2024. Please use +:class:`airflow.providers.google.cloud.operators.vertex_ai.model_service.GetModelOperator` instead. +You can find example on how to use VertexAI operators here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py :language: python :dedent: 4 - :start-after: [START howto_operator_get_model] - :end-before: [END howto_operator_get_model] + :start-after: [START how_to_cloud_vertex_ai_get_model_operator] + :end-before: [END how_to_cloud_vertex_ai_get_model_operator] Once a model is created it could be deployed using :class:`~airflow.providers.google.cloud.operators.automl.AutoMLDeployModelOperator`. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_model.py +This operator deprecated for tables, video intelligence, vision and natural language is deprecated +and will be removed after 31.03.2024. Please use +:class:`airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.DeployModelOperator` instead. +You can find example on how to use VertexAI operators here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py :language: python :dedent: 4 - :start-after: [START howto_operator_deploy_model] - :end-before: [END howto_operator_deploy_model] + :start-after: [START how_to_cloud_vertex_ai_deploy_model_operator] + :end-before: [END how_to_cloud_vertex_ai_deploy_model_operator] If you wish to delete a model you can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLDeleteModelOperator`. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_model.py +This operator deprecated for tables, video intelligence, vision and natural language is deprecated +and will be removed after 31.03.2024. Please use +:class:`airflow.providers.google.cloud.operators.vertex_ai.model_service.DeleteModelOperator` instead. +You can find example on how to use VertexAI operators here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py :language: python :dedent: 4 - :start-after: [START howto_operator_automl_delete_model] - :end-before: [END howto_operator_automl_delete_model] + :start-after: [START how_to_cloud_vertex_ai_delete_model_operator] + :end-before: [END how_to_cloud_vertex_ai_delete_model_operator] .. _howto/operator:AutoMLPredictOperator: .. _howto/operator:AutoMLBatchPredictOperator: @@ -195,6 +210,33 @@ the model must be deployed. :start-after: [START howto_operator_batch_prediction] :end-before: [END howto_operator_batch_prediction] +Th :class:`~airflow.providers.google.cloud.operators.automl.AutoMLBatchPredictOperator` deprecated for tables, +video intelligence, vision and natural language is deprecated and will be removed after 31.03.2024. Please use +:class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.CreateBatchPredictionJobOperator`, +:class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.GetBatchPredictionJobOperator`, +:class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.ListBatchPredictionJobsOperator`, +:class:`airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.DeleteBatchPredictionJobOperator`, +instead. +You can find examples on how to use VertexAI operators here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator] + :end-before: [END how_to_cloud_vertex_ai_create_batch_prediction_job_operator] + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator] + :end-before: [END how_to_cloud_vertex_ai_list_batch_prediction_job_operator] + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator] + :end-before: [END how_to_cloud_vertex_ai_delete_batch_prediction_job_operator] + .. _howto/operator:AutoMLListDatasetOperator: .. _howto/operator:AutoMLDeleteDatasetOperator: @@ -205,20 +247,30 @@ You can get a list of AutoML datasets using :class:`~airflow.providers.google.cloud.operators.automl.AutoMLListDatasetOperator`. The operator returns list of datasets ids in :ref:`XCom ` under ``dataset_id_list`` key. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_dataset.py +This operator deprecated for tables, video intelligence, vision and natural language is deprecated +and will be removed after 31.03.2024. Please use +:class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.ListDatasetsOperator` instead. +You can find example on how to use VertexAI operators here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py :language: python :dedent: 4 - :start-after: [START howto_operator_list_dataset] - :end-before: [END howto_operator_list_dataset] + :start-after: [START how_to_cloud_vertex_ai_list_dataset_operator] + :end-before: [END how_to_cloud_vertex_ai_list_dataset_operator] To delete a dataset you can use :class:`~airflow.providers.google.cloud.operators.automl.AutoMLDeleteDatasetOperator`. The delete operator allows also to pass list or coma separated string of datasets ids to be deleted. -.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_dataset.py +This operator deprecated for tables, video intelligence, vision and natural language is deprecated +and will be removed after 31.03.2024. Please use +:class:`airflow.providers.google.cloud.operators.vertex_ai.dataset.DeleteDatasetOperator` instead. +You can find example on how to use VertexAI operators here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py :language: python :dedent: 4 - :start-after: [START howto_operator_delete_dataset] - :end-before: [END howto_operator_delete_dataset] + :start-after: [START how_to_cloud_vertex_ai_delete_dataset_operator] + :end-before: [END how_to_cloud_vertex_ai_delete_dataset_operator] Reference ^^^^^^^^^ diff --git a/pyproject.toml b/pyproject.toml index 4e878884ffe91..118da25708ae2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -393,6 +393,8 @@ combine-as-imports = true "tests/providers/google/cloud/hooks/vertex_ai/test_generative_model.py" = ["E402"] "tests/providers/google/cloud/hooks/vertex_ai/test_model_service.py" = ["E402"] "tests/providers/google/cloud/hooks/vertex_ai/test_pipeline_job.py" = ["E402"] +"tests/providers/google/cloud/hooks/vertex_ai/test_prediction_service.py" = ["E402"] +"tests/providers/google/cloud/operators/test_automl.py"= ["E402"] "tests/providers/google/cloud/operators/test_vertex_ai.py" = ["E402"] "tests/providers/google/cloud/operators/vertex_ai/test_generative_model.py" = ["E402"] "tests/providers/google/cloud/triggers/test_vertex_ai.py" = ["E402"] diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index 29ef9812aad8b..dd2a24eb7cb7d 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -54,6 +54,8 @@ KNOWN_DEPRECATED_CLASSES = [ "airflow.providers.google.cloud.links.dataproc.DataprocLink", + "airflow.providers.google.cloud.operators.automl.AutoMLTablesUpdateDatasetOperator", + "airflow.providers.google.cloud.operators.automl.AutoMLDeployModelOperator", ] try: diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 75d824732bdd8..13b149f4d132d 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -362,6 +362,8 @@ class TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest ".CloudDataTransferServiceS3ToGCSOperator", "airflow.providers.google.cloud.operators.cloud_storage_transfer_service" ".CloudDataTransferServiceGCSToGCSOperator", + "airflow.providers.google.cloud.operators.automl.AutoMLTablesUpdateDatasetOperator", + "airflow.providers.google.cloud.operators.automl.AutoMLDeployModelOperator", "airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator", "airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator", "airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkJobOperator", diff --git a/tests/providers/google/cloud/hooks/test_automl.py b/tests/providers/google/cloud/hooks/test_automl.py index f79dd8b51b73d..47f956c6d1032 100644 --- a/tests/providers/google/cloud/hooks/test_automl.py +++ b/tests/providers/google/cloud/hooks/test_automl.py @@ -251,3 +251,11 @@ def test_delete_dataset(self, mock_delete_dataset): mock_delete_dataset.assert_called_once_with( request=dict(name=DATASET_PATH), retry=DEFAULT, timeout=None, metadata=() ) + + @mock.patch("airflow.providers.google.cloud.hooks.automl.AutoMlClient.get_dataset") + def test_get_dataset(self, mock_get_dataset): + self.hook.get_dataset(dataset_id=DATASET_ID, location=GCP_LOCATION, project_id=GCP_PROJECT_ID) + + mock_get_dataset.assert_called_once_with( + request=dict(name=DATASET_PATH), retry=DEFAULT, timeout=None, metadata=() + ) diff --git a/tests/providers/google/cloud/hooks/vertex_ai/test_prediction_service.py b/tests/providers/google/cloud/hooks/vertex_ai/test_prediction_service.py new file mode 100644 index 0000000000000..987578b7c11ea --- /dev/null +++ b/tests/providers/google/cloud/hooks/vertex_ai/test_prediction_service.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +# For no Pydantic environment, we need to skip the tests +pytest.importorskip("google.cloud.aiplatform_v1") + +from google.api_core.gapic_v1.method import DEFAULT + +from airflow.providers.google.cloud.hooks.vertex_ai.prediction_service import ( + PredictionServiceHook, +) +from tests.providers.google.cloud.utils.base_gcp_mock import ( + mock_base_gcp_hook_default_project_id, + mock_base_gcp_hook_no_default_project_id, +) + +TEST_GCP_CONN_ID: str = "test-gcp-conn-id" +TEST_REGION: str = "test-region" +TEST_PROJECT_ID: str = "test-project-id" +TEST_ENDPOINT_ID: str = "test-endpoint-id" +TEST_OUTPUT_CONFIG: dict = {} + +BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}" +PREDICTION_SERVICE_STRING = "airflow.providers.google.cloud.hooks.vertex_ai.prediction_service.{}" + + +class TestPredictionServiceWithDefaultProjectIdHook: + def setup_method(self): + with mock.patch( + BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_base_gcp_hook_default_project_id + ): + self.hook = PredictionServiceHook(gcp_conn_id=TEST_GCP_CONN_ID) + + @mock.patch(PREDICTION_SERVICE_STRING.format("PredictionServiceHook.get_prediction_service_client")) + def test_predict(self, mock_client): + self.hook.predict( + endpoint_id=TEST_ENDPOINT_ID, + instances=["instance1", "instance2"], + project_id=TEST_PROJECT_ID, + location=TEST_REGION, + ) + mock_client.assert_called_once_with(TEST_REGION) + mock_client.return_value.predict.assert_called_once_with( + request=dict( + endpoint=f"projects/{TEST_PROJECT_ID}/locations/{TEST_REGION}/endpoints/{TEST_ENDPOINT_ID}", + instances=["instance1", "instance2"], + parameters=None, + ), + metadata=(), + retry=DEFAULT, + timeout=None, + ) + + +class TestPredictionServiceWithoutDefaultProjectIdHook: + def setup_method(self): + with mock.patch( + BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_base_gcp_hook_no_default_project_id + ): + self.hook = PredictionServiceHook(gcp_conn_id=TEST_GCP_CONN_ID) + + @mock.patch(PREDICTION_SERVICE_STRING.format("PredictionServiceHook.get_prediction_service_client")) + def test_predict(self, mock_client): + self.hook.predict( + endpoint_id=TEST_ENDPOINT_ID, + instances=["instance1", "instance2"], + project_id=TEST_PROJECT_ID, + location=TEST_REGION, + ) + mock_client.assert_called_once_with(TEST_REGION) + mock_client.return_value.predict.assert_called_once_with( + request=dict( + endpoint=f"projects/{TEST_PROJECT_ID}/locations/{TEST_REGION}/endpoints/{TEST_ENDPOINT_ID}", + instances=["instance1", "instance2"], + parameters=None, + ), + metadata=(), + retry=DEFAULT, + timeout=None, + ) diff --git a/tests/providers/google/cloud/operators/test_automl.py b/tests/providers/google/cloud/operators/test_automl.py index 4f00f76a2dbef..cecda2bf23a21 100644 --- a/tests/providers/google/cloud/operators/test_automl.py +++ b/tests/providers/google/cloud/operators/test_automl.py @@ -21,10 +21,16 @@ from unittest import mock import pytest + +# For no Pydantic environment, we need to skip the tests +pytest.importorskip("google.cloud.aiplatform_v1") + from google.api_core.gapic_v1.method import DEFAULT from google.cloud.automl_v1beta1 import BatchPredictResult, Dataset, Model, PredictResponse +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook +from airflow.providers.google.cloud.hooks.vertex_ai.prediction_service import PredictionServiceHook from airflow.providers.google.cloud.operators.automl import ( AutoMLBatchPredictOperator, AutoMLCreateDatasetOperator, @@ -50,6 +56,11 @@ MODEL_ID = "TBL9195602771183665152" DATASET_ID = "TBL123456789" MODEL = { + "display_name": MODEL_NAME, + "dataset_id": DATASET_ID, + "translation_model_metadata": {"train_budget_milli_node_hours": 1000}, +} +MODEL_DEPRECATED = { "display_name": MODEL_NAME, "dataset_id": DATASET_ID, "tables_model_metadata": {"train_budget_milli_node_hours": 1000}, @@ -62,7 +73,8 @@ INPUT_CONFIG = {"input": "value"} OUTPUT_CONFIG = {"output": "value"} PAYLOAD = {"test": "payload"} -DATASET = {"dataset_id": "data"} +DATASET = {"dataset_id": "data", "translation_dataset_metadata": "data"} +DATASET_DEPRECATED = {"tables_model_metadata": "data"} MASK = {"field": "mask"} extract_object_id = CloudAutoMLHook.extract_object_id @@ -90,6 +102,22 @@ def test_execute(self, mock_hook): metadata=(), ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + op = AutoMLTrainModelOperator( + model=MODEL_DEPRECATED, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + task_id=TASK_ID, + ) + expected_exception_str = ( + "AutoMLTrainModelOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.assert_not_called() + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( @@ -139,6 +167,38 @@ def test_execute(self, mock_hook): timeout=None, ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + returned_model = mock.MagicMock() + del returned_model.translation_model_metadata + mock_hook.return_value.get_model.return_value = returned_model + mock_hook.return_value.extract_object_id = extract_object_id + + op = AutoMLBatchPredictOperator( + model_id=MODEL_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + input_config=INPUT_CONFIG, + output_config=OUTPUT_CONFIG, + task_id=TASK_ID, + prediction_params={}, + ) + expected_exception_str = ( + "AutoMLBatchPredictOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.get_model.assert_called_once_with( + location=GCP_LOCATION, + model_id=MODEL_ID, + project_id=GCP_PROJECT_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + mock_hook.return_value.batch_predict.assert_not_called() + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( @@ -212,6 +272,52 @@ def test_templating(self, create_task_instance_of_operator): assert task.location == "location" assert task.impersonation_chain == "impersonation-chain" + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecation(self, mock_hook): + returned_model = mock.MagicMock(**MODEL_DEPRECATED) + del returned_model.translation_model_metadata + mock_hook.return_value.get_model.return_value = returned_model + + mock_hook.return_value.predict.return_value = PredictResponse() + + op = AutoMLPredictOperator( + model_id=MODEL_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + payload=PAYLOAD, + task_id=TASK_ID, + operation_params={"TEST_KEY": "TEST_VALUE"}, + ) + expected_exception_str = ( + "AutoMLPredictOperator for text, image, and video prediction has been " + "deprecated. Please use endpoint_id param instead of model_id param." + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.predict.assert_not_called() + + @pytest.mark.db_test + def test_hook_type(self): + op = AutoMLPredictOperator( + model_id=MODEL_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + payload=PAYLOAD, + task_id=TASK_ID, + operation_params={"TEST_KEY": "TEST_VALUE"}, + ) + assert isinstance(op.hook, CloudAutoMLHook) + + op = AutoMLPredictOperator( + endpoint_id="endpoint_id", + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + payload=PAYLOAD, + task_id=TASK_ID, + operation_params={"TEST_KEY": "TEST_VALUE"}, + ) + assert isinstance(op.hook, PredictionServiceHook) + class TestAutoMLCreateImportOperator: @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") @@ -235,6 +341,22 @@ def test_execute(self, mock_hook): timeout=None, ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + op = AutoMLCreateDatasetOperator( + dataset=DATASET_DEPRECATED, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + task_id=TASK_ID, + ) + expected_exception_str = ( + "AutoMLCreateDatasetOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.create_dataset.assert_not_called() + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( @@ -324,41 +446,38 @@ def test_execute(self, mock_hook): dataset = copy.deepcopy(DATASET) dataset["name"] = DATASET_ID - op = AutoMLTablesUpdateDatasetOperator( - dataset=dataset, - update_mask=MASK, - location=GCP_LOCATION, - task_id=TASK_ID, - ) - op.execute(context=mock.MagicMock()) - mock_hook.return_value.update_dataset.assert_called_once_with( - dataset=dataset, - metadata=(), - retry=DEFAULT, - timeout=None, - update_mask=MASK, + expected_exception_str = ( + r"Call to deprecated class AutoMLTablesUpdateDatasetOperator. \(Class " + r"`AutoMLTablesUpdateDatasetOperator` has been deprecated and no longer available" ) + with pytest.raises(AirflowProviderDeprecationWarning, match=expected_exception_str): + AutoMLTablesUpdateDatasetOperator( + dataset=dataset, + update_mask=MASK, + location=GCP_LOCATION, + task_id=TASK_ID, + ) + mock_hook.assert_not_called() @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): - ti = create_task_instance_of_operator( - AutoMLTablesUpdateDatasetOperator, - # Templated fields - dataset="{{ 'dataset' }}", - update_mask="{{ 'update-mask' }}", - location="{{ 'location' }}", - impersonation_chain="{{ 'impersonation-chain' }}", - # Other parameters - dag_id="test_template_body_templating_dag", - task_id="test_template_body_templating_task", - execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), + with pytest.raises(AirflowProviderDeprecationWarning) as err: + create_task_instance_of_operator( + AutoMLTablesUpdateDatasetOperator, + # Templated fields + dataset="{{ 'dataset' }}", + update_mask="{{ 'update-mask' }}", + location="{{ 'location' }}", + impersonation_chain="{{ 'impersonation-chain' }}", + # Other parameters + dag_id="test_template_body_templating_dag", + task_id="test_template_body_templating_task", + execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), + ) + assert str(err.value).startswith( + "Call to deprecated class AutoMLTablesUpdateDatasetOperator. " + "(Class `AutoMLTablesUpdateDatasetOperator` has been deprecated and no longer available" ) - ti.render_templates() - task: AutoMLTablesUpdateDatasetOperator = ti.task - assert task.dataset == "dataset" - assert task.update_mask == "update-mask" - assert task.location == "location" - assert task.impersonation_chain == "impersonation-chain" class TestAutoMLGetModelOperator: @@ -383,6 +502,33 @@ def test_execute(self, mock_hook): timeout=None, ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + returned_model = mock.MagicMock(**MODEL_DEPRECATED) + del returned_model.translation_model_metadata + mock_hook.return_value.get_model.return_value = returned_model + + op = AutoMLGetModelOperator( + model_id=MODEL_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + task_id=TASK_ID, + ) + expected_exception_str = ( + "AutoMLGetModelOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.get_model.assert_called_once_with( + location=GCP_LOCATION, + metadata=(), + model_id=MODEL_ID, + project_id=GCP_PROJECT_ID, + retry=DEFAULT, + timeout=None, + ) + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( @@ -424,42 +570,25 @@ def test_execute(self, mock_hook): timeout=None, ) - @pytest.mark.db_test - def test_templating(self, create_task_instance_of_operator): - ti = create_task_instance_of_operator( - AutoMLDeleteModelOperator, - # Templated fields - model_id="{{ 'model-id' }}", - location="{{ 'location' }}", - project_id="{{ 'project-id' }}", - impersonation_chain="{{ 'impersonation-chain' }}", - # Other parameters - dag_id="test_template_body_templating_dag", - task_id="test_template_body_templating_task", - execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), - ) - ti.render_templates() - task: AutoMLDeleteModelOperator = ti.task - assert task.model_id == "model-id" - assert task.location == "location" - assert task.project_id == "project-id" - assert task.impersonation_chain == "impersonation-chain" - - -class TestAutoMLDeployModelOperator: @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") - def test_execute(self, mock_hook): - image_detection_metadata = {} - op = AutoMLDeployModelOperator( + def test_execute_deprecated(self, mock_hook): + returned_model = mock.MagicMock(**MODEL_DEPRECATED) + del returned_model.translation_model_metadata + mock_hook.return_value.get_model.return_value = returned_model + + op = AutoMLDeleteModelOperator( model_id=MODEL_ID, - image_detection_metadata=image_detection_metadata, location=GCP_LOCATION, project_id=GCP_PROJECT_ID, task_id=TASK_ID, ) - op.execute(context=None) - mock_hook.return_value.deploy_model.assert_called_once_with( - image_detection_metadata={}, + expected_exception_str = ( + "AutoMLDeleteModelOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.get_model.assert_called_once_with( location=GCP_LOCATION, metadata=(), model_id=MODEL_ID, @@ -467,11 +596,12 @@ def test_execute(self, mock_hook): retry=DEFAULT, timeout=None, ) + mock_hook.return_value.delete_model.assert_not_called() @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( - AutoMLDeployModelOperator, + AutoMLDeleteModelOperator, # Templated fields model_id="{{ 'model-id' }}", location="{{ 'location' }}", @@ -483,13 +613,55 @@ def test_templating(self, create_task_instance_of_operator): execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), ) ti.render_templates() - task: AutoMLDeployModelOperator = ti.task + task: AutoMLDeleteModelOperator = ti.task assert task.model_id == "model-id" assert task.location == "location" assert task.project_id == "project-id" assert task.impersonation_chain == "impersonation-chain" +class TestAutoMLDeployModelOperator: + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute(self, mock_hook): + image_detection_metadata = {} + + expected_exception_str = ( + r"Call to deprecated class AutoMLDeployModelOperator. \(Class `AutoMLDeployModelOperator` has " + r"been deprecated and no longer available" + ) + with pytest.raises(AirflowProviderDeprecationWarning, match=expected_exception_str): + AutoMLDeployModelOperator( + model_id=MODEL_ID, + image_detection_metadata=image_detection_metadata, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + task_id=TASK_ID, + ) + + mock_hook.assert_not_called() + + @pytest.mark.db_test + def test_templating(self, create_task_instance_of_operator): + with pytest.raises(AirflowProviderDeprecationWarning) as err: + create_task_instance_of_operator( + AutoMLDeployModelOperator, + # Templated fields + model_id="{{ 'model-id' }}", + location="{{ 'location' }}", + project_id="{{ 'project-id' }}", + impersonation_chain="{{ 'impersonation-chain' }}", + # Other parameters + dag_id="test_template_body_templating_dag", + task_id="test_template_body_templating_task", + execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), + ) + + assert str(err.value).startswith( + "Call to deprecated class AutoMLDeployModelOperator. " + "(Class `AutoMLDeployModelOperator` has been deprecated and no longer available" + ) + + class TestAutoMLDatasetImportOperator: @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") def test_execute(self, mock_hook): @@ -511,6 +683,35 @@ def test_execute(self, mock_hook): timeout=None, ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + returned_dataset = mock.MagicMock() + del returned_dataset.translation_dataset_metadata + mock_hook.return_value.get_dataset.return_value = returned_dataset + + op = AutoMLImportDataOperator( + dataset_id=DATASET_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + input_config=INPUT_CONFIG, + task_id=TASK_ID, + ) + expected_exception_str = ( + "AutoMLImportDataOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.get_dataset.assert_called_once_with( + dataset_id=DATASET_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + metadata=(), + retry=DEFAULT, + timeout=None, + ) + mock_hook.return_value.import_data.assert_not_called() + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( @@ -598,6 +799,27 @@ def test_execute(self, mock_hook): timeout=None, ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + not_valid_dataset = mock.MagicMock() + del not_valid_dataset.translation_dataset_metadata + mock_hook.return_value.list_datasets.return_value = [DATASET, not_valid_dataset] + op = AutoMLListDatasetOperator(location=GCP_LOCATION, project_id=GCP_PROJECT_ID, task_id=TASK_ID) + expected_warning_str = ( + "Class `AutoMLListDatasetOperator` has been deprecated and no longer available. " + "Please use `ListDatasetsOperator` instead" + ) + with pytest.warns(UserWarning, match=expected_warning_str): + op.execute(context=mock.MagicMock()) + + mock_hook.return_value.list_datasets.assert_called_once_with( + location=GCP_LOCATION, + metadata=(), + project_id=GCP_PROJECT_ID, + retry=DEFAULT, + timeout=None, + ) + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( @@ -637,6 +859,34 @@ def test_execute(self, mock_hook): timeout=None, ) + @mock.patch("airflow.providers.google.cloud.operators.automl.CloudAutoMLHook") + def test_execute_deprecated(self, mock_hook): + returned_dataset = mock.MagicMock() + del returned_dataset.translation_dataset_metadata + mock_hook.return_value.get_dataset.return_value = returned_dataset + + op = AutoMLDeleteDatasetOperator( + dataset_id=DATASET_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + task_id=TASK_ID, + ) + expected_exception_str = ( + "AutoMLDeleteDatasetOperator for text, image, and video prediction has been " + "deprecated and no longer available" + ) + with pytest.raises(AirflowException, match=expected_exception_str): + op.execute(context=mock.MagicMock()) + mock_hook.return_value.get_dataset.assert_called_once_with( + dataset_id=DATASET_ID, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + metadata=(), + retry=DEFAULT, + timeout=None, + ) + mock_hook.return_value.delete_dataset.assert_not_called() + @pytest.mark.db_test def test_templating(self, create_task_instance_of_operator): ti = create_task_instance_of_operator( diff --git a/tests/system/providers/google/cloud/automl/example_automl_dataset.py b/tests/system/providers/google/cloud/automl/example_automl_dataset.py index 1c2691657e19b..50950d761473d 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_dataset.py +++ b/tests/system/providers/google/cloud/automl/example_automl_dataset.py @@ -23,7 +23,6 @@ from __future__ import annotations import os -from copy import deepcopy from datetime import datetime from airflow.models.dag import DAG @@ -35,7 +34,6 @@ AutoMLListDatasetOperator, AutoMLTablesListColumnSpecsOperator, AutoMLTablesListTableSpecsOperator, - AutoMLTablesUpdateDatasetOperator, ) from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, @@ -139,20 +137,6 @@ def get_target_column_spec(columns_specs: list[dict], column_name: str) -> str: ) # [END howto_operator_automl_column_specs] - # [START howto_operator_automl_update_dataset] - update = deepcopy(DATASET) - update["name"] = '{{ task_instance.xcom_pull("create_dataset")["name"] }}' - update["tables_dataset_metadata"][ # type: ignore - "target_column_spec_id" - ] = "{{ get_target_column_spec(task_instance.xcom_pull('list_columns_spec_task'), target) }}" - - update_dataset = AutoMLTablesUpdateDatasetOperator( - task_id="update_dataset", - dataset=update, - location=GCP_AUTOML_LOCATION, - ) - # [END howto_operator_automl_update_dataset] - # [START howto_operator_list_dataset] list_datasets = AutoMLListDatasetOperator( task_id="list_datasets", @@ -181,7 +165,6 @@ def get_target_column_spec(columns_specs: list[dict], column_name: str) -> str: >> import_dataset >> list_tables_spec >> list_columns_spec - >> update_dataset >> list_datasets # TEST TEARDOWN >> delete_dataset diff --git a/tests/system/providers/google/cloud/automl/example_automl_model.py b/tests/system/providers/google/cloud/automl/example_automl_model.py index 59ec91c8790d5..d6c3ee9598c11 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_model.py +++ b/tests/system/providers/google/cloud/automl/example_automl_model.py @@ -23,7 +23,6 @@ from __future__ import annotations import os -from copy import deepcopy from datetime import datetime from google.protobuf.struct_pb2 import Value @@ -35,13 +34,11 @@ AutoMLCreateDatasetOperator, AutoMLDeleteDatasetOperator, AutoMLDeleteModelOperator, - AutoMLDeployModelOperator, AutoMLGetModelOperator, AutoMLImportDataOperator, AutoMLPredictOperator, AutoMLTablesListColumnSpecsOperator, AutoMLTablesListTableSpecsOperator, - AutoMLTablesUpdateDatasetOperator, AutoMLTrainModelOperator, ) from airflow.providers.google.cloud.operators.gcs import ( @@ -170,18 +167,6 @@ def get_target_column_spec(columns_specs: list[dict], column_name: str) -> str: project_id=GCP_PROJECT_ID, ) - update = deepcopy(DATASET) - update["name"] = '{{ task_instance.xcom_pull("create_dataset")["name"] }}' - update["tables_dataset_metadata"][ # type: ignore - "target_column_spec_id" - ] = "{{ get_target_column_spec(task_instance.xcom_pull('list_columns_spec'), target) }}" - - update_dataset = AutoMLTablesUpdateDatasetOperator( - task_id="update_dataset", - dataset=update, - location=GCP_AUTOML_LOCATION, - ) - # [START howto_operator_automl_create_model] create_model = AutoMLTrainModelOperator( task_id="create_model", @@ -201,15 +186,6 @@ def get_target_column_spec(columns_specs: list[dict], column_name: str) -> str: ) # [END howto_operator_get_model] - # [START howto_operator_deploy_model] - deploy_model = AutoMLDeployModelOperator( - task_id="deploy_model", - model_id=model_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, - ) - # [END howto_operator_deploy_model] - # [START howto_operator_prediction] predict_task = AutoMLPredictOperator( task_id="predict_task", @@ -262,11 +238,9 @@ def get_target_column_spec(columns_specs: list[dict], column_name: str) -> str: >> import_dataset >> list_tables_spec >> list_columns_spec - >> update_dataset # TEST BODY >> create_model >> get_model - >> deploy_model >> predict_task >> batch_predict_task # TEST TEARDOWN