diff --git a/providers/src/airflow/providers/google/ads/hooks/ads.py b/providers/src/airflow/providers/google/ads/hooks/ads.py index f8703837cc4c9..c1d35a613a2f1 100644 --- a/providers/src/airflow/providers/google/ads/hooks/ads.py +++ b/providers/src/airflow/providers/google/ads/hooks/ads.py @@ -19,6 +19,7 @@ from __future__ import annotations +import warnings from functools import cached_property from tempfile import NamedTemporaryFile from typing import IO, TYPE_CHECKING, Any, Literal @@ -27,7 +28,7 @@ from google.ads.googleads.errors import GoogleAdsException from google.auth.exceptions import GoogleAuthError -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.hooks.base import BaseHook from airflow.providers.google.common.hooks.base_google import get_field @@ -116,7 +117,7 @@ def __init__( self.authentication_method: Literal["service_account", "developer_token"] = "service_account" def search( - self, client_ids: list[str], query: str, page_size: int = 10000, **kwargs + self, client_ids: list[str], query: str, page_size: int | None = None, **kwargs ) -> list[GoogleAdsRow]: """ Pull data from the Google Ads API. @@ -133,7 +134,8 @@ def search( :param client_ids: Google Ads client ID(s) to query the API for. :param query: Google Ads Query Language query. - :param page_size: Number of results to return per page. Max 10000. + :param page_size: Number of results to return per page. Max 10000 (for version 16 and 16.1) + This parameter deprecated. After February 05, 2025, it will be removed. :return: Google Ads API response, converted to Google Ads Row objects. """ data_proto_plus = self._search(client_ids, query, page_size, **kwargs) @@ -142,7 +144,7 @@ def search( return data_native_pb def search_proto_plus( - self, client_ids: list[str], query: str, page_size: int = 10000, **kwargs + self, client_ids: list[str], query: str, page_size: int | None = None, **kwargs ) -> list[GoogleAdsRow]: """ Pull data from the Google Ads API. @@ -152,7 +154,8 @@ def search_proto_plus( :param client_ids: Google Ads client ID(s) to query the API for. :param query: Google Ads Query Language query. - :param page_size: Number of results to return per page. Max 10000. + :param page_size: Number of results to return per page. Max 10000 (for version 16 and 16.1) + This parameter is deprecated. After February 05, 2025, it will be removed. :return: Google Ads API response, converted to Google Ads Row objects """ return self._search(client_ids, query, page_size, **kwargs) @@ -267,24 +270,36 @@ def _update_config_with_secret(self, secrets_temp: IO[str]) -> None: self.google_ads_config["json_key_file_path"] = secrets_temp.name def _search( - self, client_ids: list[str], query: str, page_size: int = 10000, **kwargs + self, client_ids: list[str], query: str, page_size: int | None = None, **kwargs ) -> list[GoogleAdsRow]: """ Pull data from the Google Ads API. :param client_ids: Google Ads client ID(s) to query the API for. :param query: Google Ads Query Language query. - :param page_size: Number of results to return per page. Max 10000. + :param page_size: Number of results to return per page. Max 10000 (for version 16 and 16.1) + This parameter is deprecated. After February 05, 2025, it will be removed. :return: Google Ads API response, converted to Google Ads Row objects """ service = self._get_service + extra_req_params = {} + if self.api_version == "v16": # TODO: remove this after deprecation removal for page_size parameter + extra_req_params["page_size"] = page_size or 10000 + else: + if page_size: + warnings.warn( + "page_size parameter for the GoogleAdsHook.search and " + "GoogleAdsHook.search_proto_plus method is deprecated and will be removed " + "after February 05, 2025.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + iterators = [] for client_id in client_ids: - iterator = service.search( - request={"customer_id": client_id, "query": query, "page_size": page_size} - ) + iterator = service.search(request={"customer_id": client_id, "query": query, **extra_req_params}) iterators.append(iterator) self.log.info("Fetched Google Ads Iterators") diff --git a/providers/src/airflow/providers/google/ads/transfers/ads_to_gcs.py b/providers/src/airflow/providers/google/ads/transfers/ads_to_gcs.py index b36e85854e752..51ed0662842c5 100644 --- a/providers/src/airflow/providers/google/ads/transfers/ads_to_gcs.py +++ b/providers/src/airflow/providers/google/ads/transfers/ads_to_gcs.py @@ -17,11 +17,13 @@ from __future__ import annotations import csv +import warnings from collections.abc import Sequence from operator import attrgetter from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.providers.google.ads.hooks.ads import GoogleAdsHook from airflow.providers.google.cloud.hooks.gcs import GCSHook @@ -52,7 +54,8 @@ class GoogleAdsToGcsOperator(BaseOperator): :param obj: GCS path to save the object. Must be the full file path (ex. `path/to/file.txt`) :param gcp_conn_id: Airflow Google Cloud connection ID :param google_ads_conn_id: Airflow Google Ads connection ID - :param page_size: The number of results per API page request. Max 10,000 + :param page_size: The number of results per API page request. Max 10,000 (for version 16 and 16.1) + This parameter deprecated. After March 01, 2025, it will be removed. :param gzip: Option to compress local file or file data for upload :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token @@ -84,7 +87,7 @@ def __init__( obj: str, gcp_conn_id: str = "google_cloud_default", google_ads_conn_id: str = "google_ads_default", - page_size: int = 10000, + page_size: int | None = None, gzip: bool = False, impersonation_chain: str | Sequence[str] | None = None, api_version: str | None = None, @@ -98,7 +101,8 @@ def __init__( self.obj = obj self.gcp_conn_id = gcp_conn_id self.google_ads_conn_id = google_ads_conn_id - self.page_size = page_size + # TODO: remove this after deprecation removal for page_size parameter + self.page_size = page_size or 10000 if api_version == "v16" else None self.gzip = gzip self.impersonation_chain = impersonation_chain self.api_version = api_version @@ -109,7 +113,17 @@ def execute(self, context: Context) -> None: google_ads_conn_id=self.google_ads_conn_id, api_version=self.api_version, ) - rows = service.search(client_ids=self.client_ids, query=self.query, page_size=self.page_size) + + if self.api_version != "v16" and self.page_size: + warnings.warn( + "page_size parameter for the GoogleAdsToGcsOperator is deprecated and will be removed " + "after March 01, 2025.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + rows = service.search(client_ids=self.client_ids, query=self.query) + else: + rows = service.search(client_ids=self.client_ids, query=self.query, page_size=self.page_size) try: getter = attrgetter(*self.attributes) diff --git a/providers/tests/google/ads/hooks/test_ads.py b/providers/tests/google/ads/hooks/test_ads.py index 67b96c0007643..813a276da4e2b 100644 --- a/providers/tests/google/ads/hooks/test_ads.py +++ b/providers/tests/google/ads/hooks/test_ads.py @@ -52,6 +52,17 @@ def mock_hook(request): yield hook +@pytest.fixture( + params=[EXTRAS_DEVELOPER_TOKEN, EXTRAS_SERVICE_ACCOUNT], ids=["developer_token", "service_account"] +) +def mock_hook_v16(request): + # TODO: remove this after deprecation removal for page_size parameter + with mock.patch("airflow.hooks.base.BaseHook.get_connection") as conn: + hook = GoogleAdsHook(api_version="v16") + conn.return_value.extra_dejson = request.param + yield hook + + @pytest.fixture( params=[ {"input": EXTRAS_DEVELOPER_TOKEN, "expected_result": "developer_token"}, @@ -94,7 +105,27 @@ def test_search(self, mock_client, mock_hook): # avoid additional __iter__ calls mock_hook._extract_rows = list query = "QUERY" - mock_hook.search(client_ids=client_ids, query=query, page_size=2) + mock_hook.search(client_ids=client_ids, query=query) + for i, client_id in enumerate(client_ids): + name, args, kwargs = service.search.mock_calls[i] + assert kwargs["request"]["customer_id"] == client_id + assert kwargs["request"]["query"] == query + assert "page_size" not in kwargs["request"] + + # TODO: remove this after deprecation removal for page_size parameter + @mock.patch("airflow.providers.google.ads.hooks.ads.GoogleAdsClient") + def test_search_v16(self, mock_client, mock_hook_v16): + service = mock_client.load_from_dict.return_value.get_service.return_value + mock_client.load_from_dict.return_value.get_type.side_effect = [PropertyMock(), PropertyMock()] + client_ids = ["1", "2"] + rows = ["row1", "row2"] + service.search.side_effects = rows + + # Here we mock _extract_rows to assert calls and + # avoid additional __iter__ calls + mock_hook_v16._extract_rows = list + query = "QUERY" + mock_hook_v16.search(client_ids=client_ids, query=query, page_size=2) for i, client_id in enumerate(client_ids): name, args, kwargs = service.search.mock_calls[i] assert kwargs["request"]["customer_id"] == client_id diff --git a/providers/tests/google/ads/operators/test_ads.py b/providers/tests/google/ads/operators/test_ads.py index 9a4e7a9ad53ad..e3c1e525584ce 100644 --- a/providers/tests/google/ads/operators/test_ads.py +++ b/providers/tests/google/ads/operators/test_ads.py @@ -39,7 +39,7 @@ gcp_conn_id = "gcp_conn_id" google_ads_conn_id = "google_ads_conn_id" -api_version = "v10" +api_version = "v17" class TestGoogleAdsListAccountsOperator: diff --git a/providers/tests/google/ads/transfers/test_ads_to_gcs.py b/providers/tests/google/ads/transfers/test_ads_to_gcs.py index 3d9c428494493..2c17bcd173aeb 100644 --- a/providers/tests/google/ads/transfers/test_ads_to_gcs.py +++ b/providers/tests/google/ads/transfers/test_ads_to_gcs.py @@ -55,6 +55,38 @@ def test_execute(self, mock_gcs_hook, mock_ads_hook): google_ads_conn_id=google_ads_conn_id, api_version=api_version, ) + mock_ads_hook.return_value.search.assert_called_once_with( + client_ids=CLIENT_IDS, query=QUERY, page_size=None + ) + mock_gcs_hook.assert_called_once_with( + gcp_conn_id=gcp_conn_id, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_gcs_hook.return_value.upload.assert_called_once_with( + bucket_name=BUCKET, object_name=GCS_OBJ_PATH, filename=mock.ANY, gzip=False + ) + + @mock.patch("airflow.providers.google.ads.transfers.ads_to_gcs.GoogleAdsHook") + @mock.patch("airflow.providers.google.ads.transfers.ads_to_gcs.GCSHook") + def test_execute_v16(self, mock_gcs_hook, mock_ads_hook): + op = GoogleAdsToGcsOperator( + gcp_conn_id=gcp_conn_id, + google_ads_conn_id=google_ads_conn_id, + client_ids=CLIENT_IDS, + query=QUERY, + attributes=FIELDS_TO_EXTRACT, + obj=GCS_OBJ_PATH, + bucket=BUCKET, + task_id="run_operator", + impersonation_chain=IMPERSONATION_CHAIN, + api_version="v16", + ) + op.execute({}) + mock_ads_hook.assert_called_once_with( + gcp_conn_id=gcp_conn_id, + google_ads_conn_id=google_ads_conn_id, + api_version="v16", + ) mock_ads_hook.return_value.search.assert_called_once_with( client_ids=CLIENT_IDS, query=QUERY, page_size=10000 )