From 91f564437a8b37ccd19953d384d5f2f5542a857c Mon Sep 17 00:00:00 2001 From: Oleg Kachur Date: Tue, 17 Dec 2024 15:52:25 +0000 Subject: [PATCH] remove deprecated CreateDataPipelineOperator, RunDataPipelineOperator --- .../operators/cloud/datapipeline.rst | 95 ----------- .../airflow/providers/google/CHANGELOG.rst | 3 + .../google/cloud/operators/datapipeline.py | 63 -------- .../airflow/providers/google/provider.yaml | 11 -- .../cloud/operators/test_datapipeline.py | 4 +- .../google/cloud/datapipelines/__init__.py | 16 -- .../datapipelines/example_datapipeline.py | 151 ------------------ tests/always/test_project_structure.py | 2 - 8 files changed, 5 insertions(+), 340 deletions(-) delete mode 100644 docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst delete mode 100644 providers/src/airflow/providers/google/cloud/operators/datapipeline.py delete mode 100644 providers/tests/system/google/cloud/datapipelines/__init__.py delete mode 100644 providers/tests/system/google/cloud/datapipelines/example_datapipeline.py diff --git a/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst b/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst deleted file mode 100644 index 4996afaf7c323..0000000000000 --- a/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst +++ /dev/null @@ -1,95 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - -Google Cloud Data Pipelines Operators -===================================== - -Data Pipelines is a Dataflow feature that allows customers to create -and schedule recurring jobs, view aggregated job metrics, and define -and manage job SLOs. A pipeline consists of a collection of jobs -including ways to manage them. A pipeline may be associated with a -Dataflow Template (classic/flex) and include all jobs launched with -the associated template. - -Prerequisite Tasks -^^^^^^^^^^^^^^^^^^ - -.. include:: /operators/_partials/prerequisite_tasks.rst - -Creating a Data Pipeline -^^^^^^^^^^^^^^^^^^^^^^^^ - -.. warning:: - This operator is deprecated. Please use :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePipelineOperator`. - -To create a new Data Pipelines instance using a request body and parent name, use :class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator`. -The operator accesses Google Cloud's Data Pipelines API and calls upon the -`create method `__ -to run the given pipeline. - -:class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator` accepts four parameters: - **body**: instance of the Pipeline, - **project_id**: id of the GCP project that owns the job, - **location**: destination for the Pipeline, - **gcp_conn_id**: id to connect to Google Cloud. - -The request body and project id need to be passed each time, while the GCP connection id and location have default values. -The project id and location will be used to build the parent name needed to create the operator. - -Here is an example of how you can create a Data Pipelines instance by running the above parameters with CreateDataPipelineOperator: - -.. exampleinclude:: /../../providers/tests/system/google/cloud/datapipelines/example_datapipeline.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_create_data_pipeline] - :end-before: [END howto_operator_create_data_pipeline] - -Running a Data Pipeline -^^^^^^^^^^^^^^^^^^^^^^^ - -.. warning:: - This operator is deprecated. Please use :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowRunPipelineOperator`. - -To run a Data Pipelines instance, use :class:`~airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator`. -The operator accesses Google Cloud's Data Pipelines API and calls upon the -`run method `__ -to run the given pipeline. - -:class:`~airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator` can take in four parameters: - -- ``data_pipeline_name``: the name of the Data Pipelines instance -- ``project_id``: the ID of the GCP project that owns the job -- ``location``: the location of the Data Pipelines instance -- ``gcp_conn_id``: the connection ID to connect to the Google Cloud Platform - -Only the Data Pipeline name and Project ID are required parameters, as the Location and GCP Connection ID have default values. -The Project ID and Location will be used to build the parent name, which is where the given Data Pipeline should be located. - -You can run a Data Pipelines instance by running the above parameters with RunDataPipelineOperator: - -.. exampleinclude:: /../../providers/tests/system/google/cloud/datapipelines/example_datapipeline.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_run_data_pipeline] - :end-before: [END howto_operator_run_data_pipeline] - -Once called, the RunDataPipelineOperator will return the Google Cloud `Dataflow Job `__ -created by running the given pipeline. - -For further information regarding the API usage, see -`Data Pipelines API REST Resource `__ -in the Google Cloud documentation. diff --git a/providers/src/airflow/providers/google/CHANGELOG.rst b/providers/src/airflow/providers/google/CHANGELOG.rst index 16e0a6d1c946e..44ec6a03614be 100644 --- a/providers/src/airflow/providers/google/CHANGELOG.rst +++ b/providers/src/airflow/providers/google/CHANGELOG.rst @@ -80,6 +80,8 @@ Breaking changes * Removed ``CreateHyperparameterTuningJobOperator.sync``. This parameter is not in actual use * Removed ``CustomTrainingJobBaseOperator.sync``. This parameter is not in actual use * Removed ``GKEStartPodOperator.get_gke_config_file()``. Please use ``GKEStartPodOperator.fetch_cluster_info()`` instead + * Removed ``CreateDataPipelineOperator``. Please use the ``DataflowCreatePipelineOperator`` instead + * Removed ``RunDataPipelineOperator``. Please use the ``DataflowRunPipelineOperator`` instead * Triggers @@ -141,6 +143,7 @@ Breaking changes * Removed ``BigQueryHook.run_query()``. Please use ``BigQueryHook.insert_job()`` instead * Removed ``BigQueryHook.create_external_table()``. Please use ``BigQueryHook.create_empty_table()`` instead * Removed ``BigQueryHook.get_service()``. Please use ``BigQueryHook.get_client()`` instead + * Removed ``DataPipelineHook``. Please use the DataflowHook instead * Backends diff --git a/providers/src/airflow/providers/google/cloud/operators/datapipeline.py b/providers/src/airflow/providers/google/cloud/operators/datapipeline.py deleted file mode 100644 index d9188d1658531..0000000000000 --- a/providers/src/airflow/providers/google/cloud/operators/datapipeline.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""This module contains Google Data Pipelines operators.""" - -from __future__ import annotations - -from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.google.cloud.hooks.dataflow import DEFAULT_DATAFLOW_LOCATION -from airflow.providers.google.cloud.operators.dataflow import ( - DataflowCreatePipelineOperator, - DataflowRunPipelineOperator, -) -from airflow.providers.google.common.deprecated import deprecated -from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID - - -@deprecated( - planned_removal_date="December 01, 2024", - use_instead="DataflowCreatePipelineOperator", - category=AirflowProviderDeprecationWarning, -) -class CreateDataPipelineOperator(DataflowCreatePipelineOperator): - """Creates a new Data Pipelines instance from the Data Pipelines API.""" - - -@deprecated( - planned_removal_date="December 01, 2024", - use_instead="DataflowRunPipelineOperator", - category=AirflowProviderDeprecationWarning, -) -class RunDataPipelineOperator(DataflowRunPipelineOperator): - """Runs a Data Pipelines Instance using the Data Pipelines API.""" - - def __init__( - self, - data_pipeline_name: str, - project_id: str = PROVIDE_PROJECT_ID, - location: str = DEFAULT_DATAFLOW_LOCATION, - gcp_conn_id: str = "google_cloud_default", - **kwargs, - ) -> None: - super().__init__( - pipeline_name=data_pipeline_name, - project_id=project_id, - location=location, - gcp_conn_id=gcp_conn_id, - **kwargs, - ) diff --git a/providers/src/airflow/providers/google/provider.yaml b/providers/src/airflow/providers/google/provider.yaml index cb3bb5082e77d..d29a4d8ce941f 100644 --- a/providers/src/airflow/providers/google/provider.yaml +++ b/providers/src/airflow/providers/google/provider.yaml @@ -404,11 +404,6 @@ integrations: - /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst logo: /integration-logos/gcp/Cloud-Dataflow.png tags: [gcp] - - integration-name: Google Data Pipelines - external-doc-url: https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest - how-to-guide: - - /docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst - tags: [gcp] - integration-name: Google Data Fusion external-doc-url: https://cloud.google.com/data-fusion/ how-to-guide: @@ -587,9 +582,6 @@ operators: - integration-name: Google Dataflow python-modules: - airflow.providers.google.cloud.operators.dataflow - - integration-name: Google Data Pipelines - python-modules: - - airflow.providers.google.cloud.operators.datapipeline - integration-name: Google Data Fusion python-modules: - airflow.providers.google.cloud.operators.datafusion @@ -838,9 +830,6 @@ hooks: - integration-name: Google Dataflow python-modules: - airflow.providers.google.cloud.hooks.dataflow - - integration-name: Google Data Pipelines - python-modules: - - airflow.providers.google.cloud.hooks.datapipeline - integration-name: Google Data Fusion python-modules: - airflow.providers.google.cloud.hooks.datafusion diff --git a/providers/tests/google/cloud/operators/test_datapipeline.py b/providers/tests/google/cloud/operators/test_datapipeline.py index 130b58d409a27..c2963d184e1d1 100644 --- a/providers/tests/google/cloud/operators/test_datapipeline.py +++ b/providers/tests/google/cloud/operators/test_datapipeline.py @@ -52,7 +52,7 @@ TEST_DATA_PIPELINE_NAME = "test_data_pipeline_name" -class TestCreateDataPipelineOperator: +class TestDataflowCreatePipelineOperator: @pytest.fixture def create_operator(self): """ @@ -85,7 +85,7 @@ def test_execute(self, mock_hook, create_operator): @pytest.mark.db_test -class TestRunDataPipelineOperator: +class TestDataflowRunPipelineOperator: @pytest.fixture def run_operator(self): """ diff --git a/providers/tests/system/google/cloud/datapipelines/__init__.py b/providers/tests/system/google/cloud/datapipelines/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/providers/tests/system/google/cloud/datapipelines/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py b/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py deleted file mode 100644 index 0c0c430eae150..0000000000000 --- a/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py +++ /dev/null @@ -1,151 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Example Airflow DAG for testing Google DataPipelines Create Data Pipeline Operator. -""" - -from __future__ import annotations - -import os -from datetime import datetime - -from airflow.models.dag import DAG -from airflow.providers.google.cloud.operators.dataflow import DataflowDeletePipelineOperator -from airflow.providers.google.cloud.operators.datapipeline import ( - CreateDataPipelineOperator, - RunDataPipelineOperator, -) -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) -from airflow.utils.trigger_rule import TriggerRule - -from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID - -DAG_ID = "datapipeline" -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -GCP_LOCATION = "us-central1" - -PIPELINE_NAME = f"{DAG_ID}-{ENV_ID}".replace("_", "-") -PIPELINE_JOB_NAME = f"{DAG_ID}-{ENV_ID}-job".replace("_", "-") -PIPELINE_TYPE = "PIPELINE_TYPE_BATCH" - -RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" -BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("-", "_") - -FILE_NAME = "kinglear.txt" -TEMPLATE_FILE = "word-count.json" -TEMP_LOCATION = f"gs://{BUCKET_NAME}/temp" - -GCS_PATH = f"gs://{BUCKET_NAME}/dataflow/{TEMPLATE_FILE}" -INPUT_FILE = f"gs://{BUCKET_NAME}/dataflow/{FILE_NAME}" -OUTPUT = f"gs://{BUCKET_NAME}/results/hello" - -with DAG( - DAG_ID, - schedule="@once", - start_date=datetime(2021, 1, 1), - catchup=False, - tags=["example", "datapipeline"], -) as dag: - create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) - - move_files_to_bucket = GCSSynchronizeBucketsOperator( - task_id="move_files_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="dataflow/pipelines", - destination_bucket=BUCKET_NAME, - destination_object="dataflow", - recursive=True, - ) - - # [START howto_operator_create_data_pipeline] - create_data_pipeline = CreateDataPipelineOperator( - task_id="create_data_pipeline", - project_id=GCP_PROJECT_ID, - location=GCP_LOCATION, - body={ - "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}", - "type": PIPELINE_TYPE, - "workload": { - "dataflowFlexTemplateRequest": { - "launchParameter": { - "containerSpecGcsPath": GCS_PATH, - "jobName": PIPELINE_JOB_NAME, - "environment": {"tempLocation": TEMP_LOCATION}, - "parameters": { - "inputFile": INPUT_FILE, - "output": OUTPUT, - }, - }, - "projectId": GCP_PROJECT_ID, - "location": GCP_LOCATION, - } - }, - }, - ) - # [END howto_operator_create_data_pipeline] - - # [START howto_operator_run_data_pipeline] - run_data_pipeline = RunDataPipelineOperator( - task_id="run_data_pipeline", - data_pipeline_name=PIPELINE_NAME, - project_id=GCP_PROJECT_ID, - ) - # [END howto_operator_run_data_pipeline] - - # [START howto_operator_delete_dataflow_pipeline] - delete_pipeline = DataflowDeletePipelineOperator( - task_id="delete_data_pipeline", - pipeline_name=PIPELINE_NAME, - project_id=GCP_PROJECT_ID, - trigger_rule=TriggerRule.ALL_DONE, - ) - # [END howto_operator_delete_dataflow_pipeline] - - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE - ) - - ( - # TEST SETUP - create_bucket - >> move_files_to_bucket - # TEST BODY - >> create_data_pipeline - >> run_data_pipeline - # TEST TEARDOWN - >> delete_pipeline - >> delete_bucket - ) - - from tests_common.test_utils.watcher import watcher - - # This test needs watcher in order to properly mark success/failure - # when "teardown" task with trigger rule is part of the DAG - list(dag.tasks) >> watcher() - - -from tests_common.test_utils.system_tests import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index ad24f34e0c32b..2743f044e753a 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -354,8 +354,6 @@ class TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest "airflow.providers.google.cloud.operators.automl.AutoMLTablesUpdateDatasetOperator", "airflow.providers.google.cloud.operators.automl.AutoMLDeployModelOperator", "airflow.providers.google.cloud.operators.automl.AutoMLBatchPredictOperator", - "airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator", - "airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator", "airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator", "airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator", "airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator",