From 90acbfbba1a3e6535b87376aeaf089805b7d3303 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 19 Apr 2024 14:22:30 +0200 Subject: [PATCH] Apply PROVIDE_PROJECT_ID mypy workaround across Google provider (#39129) There is a simple workaround implemented several years ago for Google provider `project_id` default value being PROVIDE_PROJECT_ID that satisfy mypy checks for project_id being set. They way how `fallback_to_default_project_id` works is that across all the providers the project_id is actually set, even if technically it's default value is set to None. This is similar typing workaround as we use for NEW_SESSION in the core of Airflow. The workaround has not been applied consistently across all the google provider code and occasionally it causes MyPy complaining when newer version of a google library introduces more strict type checking and expects the provider_id to be set. This PR applies the workaround across all the Google provider code. This is - generally speaking a no-op operation. Nothing changes, except MyPy being aware that the project_id is actually going to be set even if it is technically set to None. --- .../providers/google/cloud/hooks/automl.py | 2 +- .../providers/google/cloud/hooks/bigquery.py | 65 ++++++++++--------- .../providers/google/cloud/hooks/cloud_sql.py | 9 ++- .../hooks/cloud_storage_transfer_service.py | 8 ++- .../google/cloud/hooks/compute_ssh.py | 3 +- .../providers/google/cloud/hooks/dataplex.py | 8 ++- airflow/providers/google/cloud/hooks/dlp.py | 28 ++++---- airflow/providers/google/cloud/hooks/gcs.py | 8 ++- airflow/providers/google/cloud/hooks/gdm.py | 4 +- .../google/cloud/hooks/kubernetes_engine.py | 4 +- .../providers/google/cloud/hooks/mlengine.py | 12 ++-- .../providers/google/cloud/hooks/pubsub.py | 2 +- .../google/cloud/hooks/secret_manager.py | 4 +- .../google/cloud/log/gcs_task_handler.py | 3 +- .../google/cloud/operators/automl.py | 25 +++---- .../google/cloud/operators/bigquery.py | 27 ++++---- .../google/cloud/operators/bigquery_dts.py | 7 +- .../google/cloud/operators/bigtable.py | 13 ++-- .../google/cloud/operators/cloud_build.py | 23 +++---- .../cloud/operators/cloud_memorystore.py | 33 +++++----- .../google/cloud/operators/cloud_sql.py | 20 +++--- .../cloud_storage_transfer_service.py | 15 +++-- .../google/cloud/operators/compute.py | 23 +++---- .../google/cloud/operators/datacatalog.py | 41 ++++++------ .../google/cloud/operators/dataflow.py | 15 +++-- .../google/cloud/operators/datafusion.py | 21 +++--- .../google/cloud/operators/datapipeline.py | 5 +- .../google/cloud/operators/dataprep.py | 9 +-- .../google/cloud/operators/dataproc.py | 33 +++++----- .../google/cloud/operators/datastore.py | 15 +++-- .../providers/google/cloud/operators/dlp.py | 61 ++++++++--------- .../google/cloud/operators/functions.py | 7 +- .../providers/google/cloud/operators/gcs.py | 4 +- .../cloud/operators/kubernetes_engine.py | 25 +++---- .../google/cloud/operators/life_sciences.py | 3 +- .../google/cloud/operators/mlengine.py | 21 +++--- .../google/cloud/operators/pubsub.py | 11 ++-- .../google/cloud/operators/spanner.py | 13 ++-- .../google/cloud/operators/speech_to_text.py | 3 +- .../google/cloud/operators/stackdriver.py | 21 +++--- .../providers/google/cloud/operators/tasks.py | 27 ++++---- .../google/cloud/operators/text_to_speech.py | 3 +- .../cloud/operators/translate_speech.py | 3 +- .../google/cloud/operators/vision.py | 25 +++---- .../google/cloud/operators/workflows.py | 19 +++--- .../google/cloud/secrets/secret_manager.py | 3 +- .../google/cloud/sensors/bigquery_dts.py | 3 +- .../google/cloud/sensors/bigtable.py | 3 +- .../sensors/cloud_storage_transfer_service.py | 3 +- .../google/cloud/sensors/dataflow.py | 9 +-- .../google/cloud/sensors/datafusion.py | 3 +- .../google/cloud/sensors/dataproc.py | 5 +- .../providers/google/cloud/sensors/tasks.py | 3 +- .../google/cloud/sensors/workflows.py | 3 +- .../google/cloud/transfers/bigquery_to_gcs.py | 3 +- .../google/cloud/transfers/gcs_to_bigquery.py | 3 +- .../google/cloud/triggers/bigquery.py | 6 +- .../google/cloud/triggers/cloud_sql.py | 3 +- .../cloud_storage_transfer_service.py | 3 +- .../google/cloud/triggers/dataproc.py | 3 +- .../google/cloud/triggers/mlengine.py | 3 +- .../google/common/hooks/base_google.py | 4 +- .../google/firebase/hooks/firestore.py | 4 +- .../google/firebase/operators/firestore.py | 3 +- 64 files changed, 439 insertions(+), 362 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/automl.py b/airflow/providers/google/cloud/hooks/automl.py index 6dae1e36d6fab..d519aca42657d 100644 --- a/airflow/providers/google/cloud/hooks/automl.py +++ b/airflow/providers/google/cloud/hooks/automl.py @@ -529,7 +529,7 @@ def list_table_specs( self, dataset_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, filter_: str | None = None, page_size: int | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index f270e256fbf75..7482be89fb419 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -59,7 +59,12 @@ from airflow.providers.google.cloud.utils.bigquery import bq_cast from airflow.providers.google.cloud.utils.credentials_provider import _get_scopes from airflow.providers.google.common.consts import CLIENT_INFO -from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook, get_field +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, + get_field, +) try: from airflow.utils.hashlib_wrapper import md5 @@ -198,7 +203,7 @@ def get_service(self) -> Resource: http_authorized = self._authorize() return build("bigquery", "v2", http=http_authorized, cache_discovery=False) - def get_client(self, project_id: str | None = None, location: str | None = None) -> Client: + def get_client(self, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None) -> Client: """Get an authenticated BigQuery Client. :param project_id: Project ID for the project which the client acts on behalf of. @@ -250,7 +255,7 @@ def get_records(self, sql, parameters=None): @staticmethod def _resolve_table_reference( table_resource: dict[str, Any], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, dataset_id: str | None = None, table_id: str | None = None, ) -> dict[str, Any]: @@ -360,7 +365,7 @@ def table_partition_exists( @GoogleBaseHook.fallback_to_default_project_id def create_empty_table( self, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, dataset_id: str | None = None, table_id: str | None = None, table_resource: dict[str, Any] | None = None, @@ -474,7 +479,7 @@ def create_empty_table( def create_empty_dataset( self, dataset_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, dataset_reference: dict[str, Any] | None = None, exists_ok: bool = True, @@ -536,7 +541,7 @@ def create_empty_dataset( def get_dataset_tables( self, dataset_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, max_results: int | None = None, retry: Retry = DEFAULT_RETRY, ) -> list[dict[str, Any]]: @@ -565,7 +570,7 @@ def get_dataset_tables( def delete_dataset( self, dataset_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, delete_contents: bool = False, retry: Retry = DEFAULT_RETRY, ) -> None: @@ -614,7 +619,7 @@ def create_external_table( description: str | None = None, encryption_configuration: dict | None = None, location: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> Table: """Create an external table in the dataset with data from Google Cloud Storage. @@ -750,7 +755,7 @@ def update_table( fields: list[str] | None = None, dataset_id: str | None = None, table_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> dict[str, Any]: """Change some fields of a table. @@ -796,7 +801,7 @@ def patch_table( self, dataset_id: str, table_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, description: str | None = None, expiration_time: int | None = None, external_data_configuration: dict | None = None, @@ -953,7 +958,7 @@ def update_dataset( fields: Sequence[str], dataset_resource: dict[str, Any], dataset_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry = DEFAULT_RETRY, ) -> Dataset: """Change some fields of a dataset. @@ -999,7 +1004,9 @@ def update_dataset( ), category=AirflowProviderDeprecationWarning, ) - def patch_dataset(self, dataset_id: str, dataset_resource: dict, project_id: str | None = None) -> dict: + def patch_dataset( + self, dataset_id: str, dataset_resource: dict, project_id: str = PROVIDE_PROJECT_ID + ) -> dict: """Patches information in an existing dataset. It only replaces fields that are provided in the submitted dataset resource. @@ -1047,7 +1054,7 @@ def patch_dataset(self, dataset_id: str, dataset_resource: dict, project_id: str def get_dataset_tables_list( self, dataset_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, table_prefix: str | None = None, max_results: int | None = None, ) -> list[dict[str, Any]]: @@ -1084,7 +1091,7 @@ def get_dataset_tables_list( @GoogleBaseHook.fallback_to_default_project_id def get_datasets_list( self, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, include_all: bool = False, filter_: str | None = None, max_results: int | None = None, @@ -1134,7 +1141,7 @@ def get_datasets_list( return datasets_list @GoogleBaseHook.fallback_to_default_project_id - def get_dataset(self, dataset_id: str, project_id: str | None = None) -> Dataset: + def get_dataset(self, dataset_id: str, project_id: str = PROVIDE_PROJECT_ID) -> Dataset: """Fetch the dataset referenced by *dataset_id*. :param dataset_id: The BigQuery Dataset ID @@ -1158,7 +1165,7 @@ def run_grant_dataset_view_access( view_dataset: str, view_table: str, view_project: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> dict[str, Any]: """Grant authorized view access of a dataset to a view table. @@ -1210,7 +1217,7 @@ def run_grant_dataset_view_access( @GoogleBaseHook.fallback_to_default_project_id def run_table_upsert( - self, dataset_id: str, table_resource: dict[str, Any], project_id: str | None = None + self, dataset_id: str, table_resource: dict[str, Any], project_id: str = PROVIDE_PROJECT_ID ) -> dict[str, Any]: """Update a table if it exists, otherwise create a new one. @@ -1267,7 +1274,7 @@ def delete_table( self, table_id: str, not_found_ok: bool = True, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> None: """Delete an existing table from the dataset. @@ -1334,7 +1341,7 @@ def list_rows( selected_fields: list[str] | str | None = None, page_token: str | None = None, start_index: int | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, retry: Retry = DEFAULT_RETRY, return_iterator: bool = False, @@ -1387,7 +1394,7 @@ def list_rows( return list(iterator) @GoogleBaseHook.fallback_to_default_project_id - def get_schema(self, dataset_id: str, table_id: str, project_id: str | None = None) -> dict: + def get_schema(self, dataset_id: str, table_id: str, project_id: str = PROVIDE_PROJECT_ID) -> dict: """Get the schema for a given dataset and table. .. seealso:: https://cloud.google.com/bigquery/docs/reference/v2/tables#resource @@ -1409,7 +1416,7 @@ def update_table_schema( include_policy_tags: bool, dataset_id: str, table_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> dict[str, Any]: """Update fields within a schema for a given dataset and table. @@ -1502,7 +1509,7 @@ def _remove_policy_tags(schema: list[dict[str, Any]]): def poll_job_complete( self, job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, retry: Retry = DEFAULT_RETRY, ) -> bool: @@ -1532,7 +1539,7 @@ def cancel_query(self) -> None: def cancel_job( self, job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, ) -> None: """Cancel a job and wait for cancellation to complete. @@ -1576,7 +1583,7 @@ def cancel_job( def get_job( self, job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, ) -> CopyJob | QueryJob | LoadJob | ExtractJob | UnknownJob: """Retrieve a BigQuery job. @@ -1607,7 +1614,7 @@ def insert_job( self, configuration: dict, job_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, nowait: bool = False, retry: Retry = DEFAULT_RETRY, @@ -3304,7 +3311,7 @@ async def get_job_instance( ) async def _get_job( - self, job_id: str | None, project_id: str | None = None, location: str | None = None + self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None ) -> CopyJob | QueryJob | LoadJob | ExtractJob | UnknownJob: """ Get BigQuery job by its ID, project ID and location. @@ -3347,7 +3354,7 @@ def _get_job_sync(self, job_id, project_id, location): return hook.get_job(job_id=job_id, project_id=project_id, location=location) async def get_job_status( - self, job_id: str | None, project_id: str | None = None, location: str | None = None + self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None ) -> dict[str, str]: job = await self._get_job(job_id=job_id, project_id=project_id, location=location) if job.state == "DONE": @@ -3359,7 +3366,7 @@ async def get_job_status( async def get_job_output( self, job_id: str | None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> dict[str, Any]: """Get the BigQuery job output for a given job ID asynchronously.""" async with ClientSession() as session: @@ -3372,7 +3379,7 @@ async def create_job_for_partition_get( self, dataset_id: str | None, table_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ): """Create a new job and get the job_id using gcloud-aio.""" async with ClientSession() as session: diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py b/airflow/providers/google/cloud/hooks/cloud_sql.py index 615afde834153..ee6796df9c649 100644 --- a/airflow/providers/google/cloud/hooks/cloud_sql.py +++ b/airflow/providers/google/cloud/hooks/cloud_sql.py @@ -53,7 +53,12 @@ from airflow.providers.google.cloud.hooks.secret_manager import ( GoogleCloudSecretManagerHook, ) -from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook, get_field +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, + get_field, +) from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.log.logging_mixin import LoggingMixin @@ -510,7 +515,7 @@ def __init__( path_prefix: str, instance_specification: str, gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, sql_proxy_version: str | None = None, sql_proxy_binary_path: str | None = None, ) -> None: diff --git a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py index 966735e9c21f1..1c3ffd2ad8831 100644 --- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py @@ -46,7 +46,11 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.common.consts import CLIENT_INFO -from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, +) if TYPE_CHECKING: from google.cloud.storage_transfer_v1.services.storage_transfer_service.pagers import ( @@ -504,7 +508,7 @@ def operations_contain_expected_statuses( class CloudDataTransferServiceAsyncHook(GoogleBaseAsyncHook): """Asynchronous hook for Google Storage Transfer Service.""" - def __init__(self, project_id: str | None = None, **kwargs: Any) -> None: + def __init__(self, project_id: str = PROVIDE_PROJECT_ID, **kwargs: Any) -> None: super().__init__(**kwargs) self.project_id = project_id self._client: StorageTransferServiceAsyncClient | None = None diff --git a/airflow/providers/google/cloud/hooks/compute_ssh.py b/airflow/providers/google/cloud/hooks/compute_ssh.py index 97df5c5525061..fc01944296059 100644 --- a/airflow/providers/google/cloud/hooks/compute_ssh.py +++ b/airflow/providers/google/cloud/hooks/compute_ssh.py @@ -29,6 +29,7 @@ from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.compute import ComputeEngineHook from airflow.providers.google.cloud.hooks.os_login import OSLoginHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.utils.types import NOTSET, ArgNotSet @@ -109,7 +110,7 @@ def __init__( instance_name: str | None = None, zone: str | None = None, user: str | None = "root", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, hostname: str | None = None, use_internal_ip: bool = False, use_iap_tunnel: bool = False, diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py index 77d175f7d7573..ff8cf56a5c14f 100644 --- a/airflow/providers/google/cloud/hooks/dataplex.py +++ b/airflow/providers/google/cloud/hooks/dataplex.py @@ -36,7 +36,11 @@ from airflow.exceptions import AirflowException from airflow.providers.google.common.consts import CLIENT_INFO -from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, +) if TYPE_CHECKING: from google.api_core.operation import Operation @@ -665,7 +669,7 @@ def wait_for_data_scan_job( self, data_scan_id: str, job_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, region: str | None = None, wait_time: int = 10, result_timeout: float | None = None, diff --git a/airflow/providers/google/cloud/hooks/dlp.py b/airflow/providers/google/cloud/hooks/dlp.py index 6d0ad9bb2d696..f9e51e1afd279 100644 --- a/airflow/providers/google/cloud/hooks/dlp.py +++ b/airflow/providers/google/cloud/hooks/dlp.py @@ -162,7 +162,7 @@ def cancel_dlp_job( def create_deidentify_template( self, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, deidentify_template: dict | DeidentifyTemplate | None = None, template_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -287,7 +287,7 @@ def create_dlp_job( def create_inspect_template( self, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_template: InspectTemplate | None = None, template_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -376,7 +376,7 @@ def create_job_trigger( def create_stored_info_type( self, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, config: dict | StoredInfoTypeConfig | None = None, stored_info_type_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -565,7 +565,7 @@ def delete_inspect_template( self, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -652,7 +652,7 @@ def delete_stored_info_type( self, stored_info_type_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -701,7 +701,7 @@ def get_deidentify_template( self, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -788,7 +788,7 @@ def get_inspect_template( self, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -875,7 +875,7 @@ def get_stored_info_type( self, stored_info_type_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -967,7 +967,7 @@ def inspect_content( def list_deidentify_templates( self, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1103,7 +1103,7 @@ def list_info_types( def list_inspect_templates( self, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1201,7 +1201,7 @@ def list_job_triggers( def list_stored_info_types( self, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1356,7 +1356,7 @@ def update_deidentify_template( self, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, deidentify_template: dict | DeidentifyTemplate | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1411,7 +1411,7 @@ def update_inspect_template( self, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_template: dict | InspectTemplate | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1513,7 +1513,7 @@ def update_stored_info_type( self, stored_info_type_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, config: dict | StoredInfoTypeConfig | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py index d2d318cd05c5d..fad566246875f 100644 --- a/airflow/providers/google/cloud/hooks/gcs.py +++ b/airflow/providers/google/cloud/hooks/gcs.py @@ -45,7 +45,11 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.cloud.utils.helpers import normalize_directory_path from airflow.providers.google.common.consts import CLIENT_INFO -from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, +) from airflow.typing_compat import ParamSpec from airflow.utils import timezone from airflow.version import version @@ -1013,7 +1017,7 @@ def create_bucket( resource: dict | None = None, storage_class: str = "MULTI_REGIONAL", location: str = "US", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, labels: dict | None = None, ) -> str: """ diff --git a/airflow/providers/google/cloud/hooks/gdm.py b/airflow/providers/google/cloud/hooks/gdm.py index dc54f9615bc64..e8311bba95e1f 100644 --- a/airflow/providers/google/cloud/hooks/gdm.py +++ b/airflow/providers/google/cloud/hooks/gdm.py @@ -22,7 +22,7 @@ from googleapiclient.discovery import Resource, build from airflow.exceptions import AirflowException -from airflow.providers.google.common.hooks.base_google import GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook class GoogleDeploymentManagerHook(GoogleBaseHook): @@ -56,7 +56,7 @@ def get_conn(self) -> Resource: @GoogleBaseHook.fallback_to_default_project_id def list_deployments( self, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, deployment_filter: str | None = None, order_by: str | None = None, ) -> list[dict[str, Any]]: diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/airflow/providers/google/cloud/hooks/kubernetes_engine.py index 9e0d4fbfbc1f4..d3e72b8574334 100644 --- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py +++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py @@ -156,7 +156,7 @@ def get_conn(self) -> container_v1.ClusterManagerClient: def get_client(self) -> ClusterManagerClient: return self.get_conn() - def wait_for_operation(self, operation: Operation, project_id: str | None = None) -> Operation: + def wait_for_operation(self, operation: Operation, project_id: str = PROVIDE_PROJECT_ID) -> Operation: """Continuously fetch the status from Google Cloud. This is done until the given operation completes, or raises an error. @@ -176,7 +176,7 @@ def wait_for_operation(self, operation: Operation, project_id: str | None = None operation = self.get_operation(operation.name, project_id=project_id or self.project_id) return operation - def get_operation(self, operation_name: str, project_id: str | None = None) -> Operation: + def get_operation(self, operation_name: str, project_id: str = PROVIDE_PROJECT_ID) -> Operation: """Get an operation from Google Cloud. :param operation_name: Name of operation to fetch diff --git a/airflow/providers/google/cloud/hooks/mlengine.py b/airflow/providers/google/cloud/hooks/mlengine.py index ac43c570c9228..35aa2cb5cd6a6 100644 --- a/airflow/providers/google/cloud/hooks/mlengine.py +++ b/airflow/providers/google/cloud/hooks/mlengine.py @@ -31,7 +31,11 @@ from googleapiclient.errors import HttpError from airflow.exceptions import AirflowException -from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, +) from airflow.version import version as airflow_version if TYPE_CHECKING: @@ -550,7 +554,7 @@ class MLEngineAsyncHook(GoogleBaseAsyncHook): def _check_fileds( self, job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ): if not project_id: raise AirflowException("Google Cloud project id is required.") @@ -569,7 +573,7 @@ async def _get_link(self, url: str, session: Session): return job - async def get_job(self, job_id: str, session: Session, project_id: str | None = None): + async def get_job(self, job_id: str, session: Session, project_id: str = PROVIDE_PROJECT_ID): """Get the specified job resource by job ID and project ID.""" self._check_fileds(project_id=project_id, job_id=job_id) @@ -579,7 +583,7 @@ async def get_job(self, job_id: str, session: Session, project_id: str | None = async def get_job_status( self, job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, ) -> str | None: """ Poll for job status asynchronously using gcloud-aio. diff --git a/airflow/providers/google/cloud/hooks/pubsub.py b/airflow/providers/google/cloud/hooks/pubsub.py index 3bccaa16df219..ea0bb819bbcae 100644 --- a/airflow/providers/google/cloud/hooks/pubsub.py +++ b/airflow/providers/google/cloud/hooks/pubsub.py @@ -590,7 +590,7 @@ class PubSubAsyncHook(GoogleBaseAsyncHook): sync_hook_class = PubSubHook - def __init__(self, project_id: str | None = None, **kwargs: Any): + def __init__(self, project_id: str = PROVIDE_PROJECT_ID, **kwargs: Any): super().__init__(**kwargs) self.project_id = project_id self._client: SubscriberAsyncClient | None = None diff --git a/airflow/providers/google/cloud/hooks/secret_manager.py b/airflow/providers/google/cloud/hooks/secret_manager.py index 5fd303445cbc1..5b87e7cae2d2f 100644 --- a/airflow/providers/google/cloud/hooks/secret_manager.py +++ b/airflow/providers/google/cloud/hooks/secret_manager.py @@ -35,7 +35,7 @@ from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.google.cloud._internal_client.secret_manager_client import _SecretManagerClient from airflow.providers.google.common.consts import CLIENT_INFO -from airflow.providers.google.common.hooks.base_google import GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook if TYPE_CHECKING: from google.api_core.retry import Retry @@ -94,7 +94,7 @@ def get_conn(self) -> _SecretManagerClient: @GoogleBaseHook.fallback_to_default_project_id def get_secret( - self, secret_id: str, secret_version: str = "latest", project_id: str | None = None + self, secret_id: str, secret_version: str = "latest", project_id: str = PROVIDE_PROJECT_ID ) -> str | None: """ Get secret value from the Secret Manager. diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index abc2bc8845b9a..1f097f05be625 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -32,6 +32,7 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id from airflow.providers.google.common.consts import CLIENT_INFO +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -82,7 +83,7 @@ def __init__( gcp_key_path: str | None = None, gcp_keyfile_dict: dict | None = None, gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, **kwargs, ): super().__init__(base_log_folder, filename_template) diff --git a/airflow/providers/google/cloud/operators/automl.py b/airflow/providers/google/cloud/operators/automl.py index 54d31025c0ca2..e1fc6b4cf6cb3 100644 --- a/airflow/providers/google/cloud/operators/automl.py +++ b/airflow/providers/google/cloud/operators/automl.py @@ -43,6 +43,7 @@ AutoMLModelTrainLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -100,7 +101,7 @@ def __init__( *, model: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -209,7 +210,7 @@ def __init__( location: str, payload: dict, operation_params: dict[str, str] | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -312,7 +313,7 @@ def __init__( input_config: dict, output_config: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, prediction_params: dict[str, str] | None = None, metadata: MetaData = (), timeout: float | None = None, @@ -409,7 +410,7 @@ def __init__( *, dataset: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -504,7 +505,7 @@ def __init__( dataset_id: str, location: str, input_config: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -608,7 +609,7 @@ def __init__( field_mask: dict | None = None, filter_: str | None = None, page_size: int | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -790,7 +791,7 @@ def __init__( *, model_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -876,7 +877,7 @@ def __init__( *, model_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -962,7 +963,7 @@ def __init__( *, model_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, image_detection_metadata: dict | None = None, metadata: Sequence[tuple[str, str]] = (), timeout: float | None = None, @@ -1053,7 +1054,7 @@ def __init__( location: str, page_size: int | None = None, filter_: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1141,7 +1142,7 @@ def __init__( self, *, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1226,7 +1227,7 @@ def __init__( *, dataset_id: str | list[str], location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, metadata: MetaData = (), timeout: float | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 8f094c0080f2b..64d3eb0f59ed3 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -56,6 +56,7 @@ BigQueryValueCheckTrigger, ) from airflow.providers.google.cloud.utils.bigquery import convert_job_id +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -554,7 +555,7 @@ def __init__( labels: dict | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), poll_interval: float = 4.0, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, **kwargs, ) -> None: super().__init__( @@ -958,7 +959,7 @@ def __init__( table_id: str, table_project_id: str | None = None, job_project_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, max_results: int = 100, selected_fields: str | None = None, gcp_conn_id: str = "google_cloud_default", @@ -1459,7 +1460,7 @@ def __init__( dataset_id: str, table_id: str, table_resource: dict[str, Any] | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, schema_fields: list | None = None, gcs_schema_object: str | None = None, time_partitioning: dict | None = None, @@ -1919,7 +1920,7 @@ def __init__( self, *, dataset_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, delete_contents: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -1999,7 +2000,7 @@ def __init__( self, *, dataset_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, dataset_reference: dict | None = None, location: str | None = None, gcp_conn_id: str = "google_cloud_default", @@ -2103,7 +2104,7 @@ def __init__( self, *, dataset_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -2166,7 +2167,7 @@ def __init__( self, *, dataset_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, max_results: int | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -2235,7 +2236,7 @@ def __init__( *, dataset_id: str, dataset_resource: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -2308,7 +2309,7 @@ def __init__( fields: list[str] | None = None, dataset_id: str | None = None, table_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -2392,7 +2393,7 @@ def __init__( dataset_resource: dict[str, Any], fields: list[str] | None = None, dataset_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -2528,7 +2529,7 @@ def __init__( *, dataset_id: str, table_resource: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", location: str | None = None, impersonation_chain: str | Sequence[str] | None = None, @@ -2635,7 +2636,7 @@ def __init__( dataset_id: str, table_id: str, include_policy_tags: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -2744,7 +2745,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix def __init__( self, configuration: dict[str, Any], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, job_id: str | None = None, force_rerun: bool = True, diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py b/airflow/providers/google/cloud/operators/bigquery_dts.py index 32e05b6504a35..9619aa3addc9d 100644 --- a/airflow/providers/google/cloud/operators/bigquery_dts.py +++ b/airflow/providers/google/cloud/operators/bigquery_dts.py @@ -37,6 +37,7 @@ from airflow.providers.google.cloud.links.bigquery_dts import BigQueryDataTransferConfigLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.bigquery_dts import BigQueryDataTransferRunTrigger +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -94,7 +95,7 @@ def __init__( self, *, transfer_config: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, authorization_code: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -187,7 +188,7 @@ def __init__( self, *, transfer_config_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -273,7 +274,7 @@ def __init__( self, *, transfer_config_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, requested_time_range: dict | None = None, requested_run_time: dict | None = None, diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py index 7437fa79dec3a..ffd98a82b6994 100644 --- a/airflow/providers/google/cloud/operators/bigtable.py +++ b/airflow/providers/google/cloud/operators/bigtable.py @@ -31,6 +31,7 @@ BigtableTablesLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: import enum @@ -112,7 +113,7 @@ def __init__( instance_id: str, main_cluster_id: str, main_cluster_zone: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, replica_clusters: list[dict[str, str]] | None = None, instance_display_name: str | None = None, instance_type: enums.Instance.Type | None = None, @@ -218,7 +219,7 @@ def __init__( self, *, instance_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, instance_display_name: str | None = None, instance_type: enums.Instance.Type | enum.IntEnum | None = None, instance_labels: dict | None = None, @@ -298,7 +299,7 @@ def __init__( self, *, instance_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -374,7 +375,7 @@ def __init__( *, instance_id: str, table_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, initial_split_keys: list | None = None, column_families: dict[str, GarbageCollectionRule] | None = None, gcp_conn_id: str = "google_cloud_default", @@ -478,7 +479,7 @@ def __init__( *, instance_id: str, table_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, app_profile_id: str | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -559,7 +560,7 @@ def __init__( instance_id: str, cluster_id: str, nodes: int, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/operators/cloud_build.py b/airflow/providers/google/cloud/operators/cloud_build.py index b3af1316893a8..3625e8c427d36 100644 --- a/airflow/providers/google/cloud/operators/cloud_build.py +++ b/airflow/providers/google/cloud/operators/cloud_build.py @@ -40,6 +40,7 @@ from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.cloud_build import CloudBuildCreateBuildTrigger from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils import yaml from airflow.utils.helpers import exactly_one @@ -87,7 +88,7 @@ def __init__( self, *, id_: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -171,7 +172,7 @@ def __init__( self, *, build: dict | Build, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, wait: bool = True, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -316,7 +317,7 @@ def __init__( self, *, trigger: dict | BuildTrigger, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -399,7 +400,7 @@ def __init__( self, *, trigger_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -473,7 +474,7 @@ def __init__( self, *, id_: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -549,7 +550,7 @@ def __init__( self, *, trigger_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -627,7 +628,7 @@ def __init__( self, *, location: str = "global", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, page_token: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -707,7 +708,7 @@ def __init__( self, *, location: str = "global", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, filter_: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -783,7 +784,7 @@ def __init__( self, *, id_: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, wait: bool = True, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -868,7 +869,7 @@ def __init__( *, trigger_id: str, source: dict | RepoSource, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, wait: bool = True, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -953,7 +954,7 @@ def __init__( *, trigger_id: str, trigger: dict | BuildTrigger, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py index 0a71f7a0c006c..fc83e0f476051 100644 --- a/airflow/providers/google/cloud/operators/cloud_memorystore.py +++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py @@ -43,6 +43,7 @@ RedisInstanceListLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -112,7 +113,7 @@ def __init__( location: str, instance_id: str, instance: dict | Instance, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -198,7 +199,7 @@ def __init__( *, location: str, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -283,7 +284,7 @@ def __init__( location: str, instance: str, output_config: dict | OutputConfig, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -376,7 +377,7 @@ def __init__( location: str, instance: str, data_protection_mode: FailoverInstanceRequest.DataProtectionMode, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -462,7 +463,7 @@ def __init__( *, location: str, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -556,7 +557,7 @@ def __init__( location: str, instance: str, input_config: dict | InputConfig, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -646,7 +647,7 @@ def __init__( *, location: str, page_size: int, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -749,7 +750,7 @@ def __init__( instance: dict | Instance, location: str | None = None, instance_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -842,7 +843,7 @@ def __init__( memory_size_gb: int, location: str | None = None, instance_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -954,7 +955,7 @@ def __init__( instance_id: str, instance: dict | Instance, input_config: dict | InputConfig, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1061,7 +1062,7 @@ def __init__( location: str, instance: str, output_config: dict | OutputConfig, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1243,7 +1244,7 @@ def __init__( location: str, instance_id: str, instance: dict | cloud_memcache.Instance, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1316,7 +1317,7 @@ def __init__( self, location: str, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1390,7 +1391,7 @@ def __init__( *, location: str, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1474,7 +1475,7 @@ def __init__( self, *, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1572,7 +1573,7 @@ def __init__( instance: dict | cloud_memcache.Instance, location: str | None = None, instance_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), diff --git a/airflow/providers/google/cloud/operators/cloud_sql.py b/airflow/providers/google/cloud/operators/cloud_sql.py index c8519e1e2634e..169c7fb2e18ad 100644 --- a/airflow/providers/google/cloud/operators/cloud_sql.py +++ b/airflow/providers/google/cloud/operators/cloud_sql.py @@ -32,7 +32,7 @@ from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.cloud_sql import CloudSQLExportTrigger from airflow.providers.google.cloud.utils.field_validator import GcpBodyFieldValidator -from airflow.providers.google.common.hooks.base_google import get_field +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, get_field from airflow.providers.google.common.links.storage import FileDetailsLink if TYPE_CHECKING: @@ -245,7 +245,7 @@ def __init__( self, *, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", impersonation_chain: str | Sequence[str] | None = None, @@ -338,7 +338,7 @@ def __init__( *, body: dict, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", validate_body: bool = True, @@ -441,7 +441,7 @@ def __init__( *, body: dict, instance: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", impersonation_chain: str | Sequence[str] | None = None, @@ -573,7 +573,7 @@ def __init__( instance: str, destination_instance_name: str, clone_context: dict | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", impersonation_chain: str | Sequence[str] | None = None, @@ -664,7 +664,7 @@ def __init__( *, instance: str, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", validate_body: bool = True, @@ -772,7 +772,7 @@ def __init__( instance: str, database: str, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", validate_body: bool = True, @@ -868,7 +868,7 @@ def __init__( *, instance: str, database: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", impersonation_chain: str | Sequence[str] | None = None, @@ -958,7 +958,7 @@ def __init__( *, instance: str, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", validate_body: bool = True, @@ -1105,7 +1105,7 @@ def __init__( *, instance: str, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1beta4", validate_body: bool = True, diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py index a0d48778ce53b..3bec97fe04fe0 100644 --- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py @@ -62,6 +62,7 @@ ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.utils.helpers import normalize_directory_path +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -234,7 +235,7 @@ def __init__( aws_conn_id: str | None = "aws_default", gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, google_impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -324,7 +325,7 @@ def __init__( aws_conn_id: str | None = "aws_default", gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, google_impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -407,7 +408,7 @@ def __init__( job_name: str, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, google_impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -467,7 +468,7 @@ class CloudDataTransferServiceGetOperationOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, operation_name: str, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", @@ -541,7 +542,7 @@ class CloudDataTransferServiceListOperationsOperator(GoogleCloudBaseOperator): def __init__( self, request_filter: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", google_impersonation_chain: str | Sequence[str] | None = None, @@ -839,7 +840,7 @@ def __init__( gcs_bucket: str, s3_path: str | None = None, gcs_path: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, aws_conn_id: str | None = "aws_default", gcp_conn_id: str = "google_cloud_default", description: str | None = None, @@ -1007,7 +1008,7 @@ def __init__( destination_bucket: str, source_path: str | None = None, destination_path: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", description: str | None = None, schedule: dict | None = None, diff --git a/airflow/providers/google/cloud/operators/compute.py b/airflow/providers/google/cloud/operators/compute.py index fa4f1c3a8fe8e..7a3bbc0b19607 100644 --- a/airflow/providers/google/cloud/operators/compute.py +++ b/airflow/providers/google/cloud/operators/compute.py @@ -36,6 +36,7 @@ from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.utils.field_sanitizer import GcpBodyFieldSanitizer from airflow.providers.google.cloud.utils.field_validator import GcpBodyFieldValidator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -51,7 +52,7 @@ def __init__( *, zone: str, resource_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", impersonation_chain: str | Sequence[str] | None = None, @@ -138,7 +139,7 @@ def __init__( body: dict, zone: str, resource_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, request_id: str | None = None, retry: Retry | None = None, timeout: float | None = None, @@ -320,7 +321,7 @@ def __init__( body: dict, zone: str, resource_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, request_id: str | None = None, retry: Retry | None = None, timeout: float | None = None, @@ -477,7 +478,7 @@ def __init__( resource_id: str, zone: str, request_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | None = None, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -721,7 +722,7 @@ def __init__( zone: str, resource_id: str, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", validate_body: bool = True, @@ -882,7 +883,7 @@ def __init__( self, *, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, resource_id: str | None = None, request_id: str | None = None, retry: Retry | None = None, @@ -1044,7 +1045,7 @@ def __init__( *, resource_id: str, request_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | None = None, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1164,7 +1165,7 @@ def __init__( *, resource_id: str, body_patch: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, request_id: str | None = None, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", @@ -1323,7 +1324,7 @@ def __init__( zone: str, source_template: str, destination_template: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, update_policy: dict[str, Any] | None = None, request_id: str | None = None, gcp_conn_id: str = "google_cloud_default", @@ -1468,7 +1469,7 @@ def __init__( *, body: dict, zone: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, resource_id: str | None = None, request_id: str | None = None, gcp_conn_id: str = "google_cloud_default", @@ -1630,7 +1631,7 @@ def __init__( *, resource_id: str, zone: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, request_id: str | None = None, gcp_conn_id: str = "google_cloud_default", api_version="v1", diff --git a/airflow/providers/google/cloud/operators/datacatalog.py b/airflow/providers/google/cloud/operators/datacatalog.py index f3ec08441d16d..b725c66246db7 100644 --- a/airflow/providers/google/cloud/operators/datacatalog.py +++ b/airflow/providers/google/cloud/operators/datacatalog.py @@ -38,6 +38,7 @@ DataCatalogTagTemplateLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -104,7 +105,7 @@ def __init__( entry_group: str, entry_id: str, entry: dict | Entry, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -220,7 +221,7 @@ def __init__( location: str, entry_group_id: str, entry_group: dict | EntryGroup, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -336,7 +337,7 @@ def __init__( entry: str, tag: dict | Tag, template_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -467,7 +468,7 @@ def __init__( location: str, tag_template_id: str, tag_template: dict | TagTemplate, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -583,7 +584,7 @@ def __init__( tag_template: str, tag_template_field_id: str, tag_template_field: dict | TagTemplateField, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -690,7 +691,7 @@ def __init__( location: str, entry_group: str, entry: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -774,7 +775,7 @@ def __init__( *, location: str, entry_group: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -860,7 +861,7 @@ def __init__( entry_group: str, entry: str, tag: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -949,7 +950,7 @@ def __init__( location: str, tag_template: str, force: bool, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1037,7 +1038,7 @@ def __init__( tag_template: str, field: str, force: bool, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1125,7 +1126,7 @@ def __init__( location: str, entry_group: str, entry: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1220,7 +1221,7 @@ def __init__( location: str, entry_group: str, read_mask: FieldMask, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1308,7 +1309,7 @@ def __init__( *, location: str, tag_template: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1403,7 +1404,7 @@ def __init__( entry_group: str, entry: str, page_size: int = 100, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1497,7 +1498,7 @@ def __init__( *, linked_resource: str | None = None, sql_resource: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1593,7 +1594,7 @@ def __init__( tag_template: str, field: str, new_tag_template_field_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1808,7 +1809,7 @@ def __init__( location: str | None = None, entry_group: str | None = None, entry_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1920,7 +1921,7 @@ def __init__( entry_group: str | None = None, entry: str | None = None, tag_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -2034,7 +2035,7 @@ def __init__( update_mask: dict | FieldMask, location: str | None = None, tag_template_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -2151,7 +2152,7 @@ def __init__( location: str | None = None, tag_template: str | None = None, tag_template_field_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index 99d85860f7d5c..4a6f197e14c8c 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -41,6 +41,7 @@ from airflow.providers.google.cloud.links.dataflow import DataflowJobLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.dataflow import TemplateJobStartTrigger +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.version import version if TYPE_CHECKING: @@ -142,7 +143,7 @@ def __init__( *, job_name: str = "{{task.task_id}}", append_job_name: bool = True, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", poll_sleep: int = 10, @@ -348,7 +349,7 @@ def __init__( job_name: str = "{{task.task_id}}", dataflow_default_options: dict | None = None, options: dict | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", poll_sleep: int = 10, @@ -606,7 +607,7 @@ def __init__( self, *, template: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, job_name: str = "{{task.task_id}}", options: dict[str, Any] | None = None, dataflow_default_options: dict[str, Any] | None = None, @@ -811,7 +812,7 @@ def __init__( self, body: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", drain_pipeline: bool = False, cancel_timeout: int | None = 10 * 60, @@ -982,7 +983,7 @@ def __init__( query: str, options: dict[str, Any], location: str = DEFAULT_DATAFLOW_LOCATION, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", drain_pipeline: bool = False, impersonation_chain: str | Sequence[str] | None = None, @@ -1150,7 +1151,7 @@ def __init__( py_options: list[str] | None = None, py_requirements: list[str] | None = None, py_system_site_packages: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", poll_sleep: int = 10, @@ -1297,7 +1298,7 @@ def __init__( self, job_name_prefix: str | None = None, job_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", poll_sleep: int = 10, diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py index ce8fe6be17e14..6b27037075246 100644 --- a/airflow/providers/google/cloud/operators/datafusion.py +++ b/airflow/providers/google/cloud/operators/datafusion.py @@ -37,6 +37,7 @@ from airflow.providers.google.cloud.triggers.datafusion import DataFusionStartPipelineTrigger from airflow.providers.google.cloud.utils.datafusion import DataFusionPipelineType from airflow.providers.google.cloud.utils.helpers import resource_path_to_dict +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -97,7 +98,7 @@ def __init__( *, instance_name: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -169,7 +170,7 @@ def __init__( *, instance_name: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -237,7 +238,7 @@ def __init__( instance_name: str, instance: dict[str, Any], location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -340,7 +341,7 @@ def __init__( instance: dict[str, Any], update_mask: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -417,7 +418,7 @@ def __init__( *, instance_name: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -499,7 +500,7 @@ def __init__( instance_name: str, location: str, namespace: str = "default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -587,7 +588,7 @@ def __init__( location: str, version_id: str | None = None, namespace: str = "default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -670,7 +671,7 @@ def __init__( artifact_name: str | None = None, artifact_version: str | None = None, namespace: str = "default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -777,7 +778,7 @@ def __init__( success_states: list[str] | None = None, namespace: str = "default", pipeline_timeout: int = 5 * 60, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -929,7 +930,7 @@ def __init__( instance_name: str, location: str, namespace: str = "default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/operators/datapipeline.py b/airflow/providers/google/cloud/operators/datapipeline.py index d95ef579b9bcd..a864d8794c696 100644 --- a/airflow/providers/google/cloud/operators/datapipeline.py +++ b/airflow/providers/google/cloud/operators/datapipeline.py @@ -24,6 +24,7 @@ from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.datapipeline import DEFAULT_DATAPIPELINE_LOCATION, DataPipelineHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -58,7 +59,7 @@ def __init__( self, *, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAPIPELINE_LOCATION, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -120,7 +121,7 @@ class RunDataPipelineOperator(GoogleCloudBaseOperator): def __init__( self, data_pipeline_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAPIPELINE_LOCATION, gcp_conn_id: str = "google_cloud_default", **kwargs, diff --git a/airflow/providers/google/cloud/operators/dataprep.py b/airflow/providers/google/cloud/operators/dataprep.py index 06b77db79177f..4636e8beb773d 100644 --- a/airflow/providers/google/cloud/operators/dataprep.py +++ b/airflow/providers/google/cloud/operators/dataprep.py @@ -24,6 +24,7 @@ from airflow.providers.google.cloud.hooks.dataprep import GoogleDataprepHook from airflow.providers.google.cloud.links.dataprep import DataprepFlowLink, DataprepJobGroupLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -92,7 +93,7 @@ def __init__( self, *, dataprep_conn_id: str = "dataprep_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, job_group_id: int | str, embed: str, include_deleted: bool, @@ -149,7 +150,7 @@ class DataprepRunJobGroupOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, dataprep_conn_id: str = "dataprep_default", body_request: dict, **kwargs, @@ -198,7 +199,7 @@ class DataprepCopyFlowOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, dataprep_conn_id: str = "dataprep_default", flow_id: int | str, name: str = "", @@ -280,7 +281,7 @@ class DataprepRunFlowOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, flow_id: int | str, body_request: dict, dataprep_conn_id: str = "dataprep_default", diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 6c3f7a3562bcd..edbfbd3f39b45 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -63,6 +63,7 @@ DataprocSubmitTrigger, ) from airflow.providers.google.cloud.utils.dataproc import DataprocOperationType +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils import timezone if TYPE_CHECKING: @@ -627,7 +628,7 @@ def __init__( *, cluster_name: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, cluster_config: dict | Cluster | None = None, virtual_cluster_config: dict | None = None, labels: dict | None = None, @@ -928,7 +929,7 @@ def __init__( self, *, cluster_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, region: str = "global", num_workers: int = 2, num_preemptible_workers: int = 0, @@ -1047,7 +1048,7 @@ def __init__( *, region: str, cluster_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, cluster_uuid: str | None = None, request_id: str | None = None, retry: AsyncRetry | _MethodDefault = DEFAULT, @@ -1173,7 +1174,7 @@ def __init__( *, cluster_name: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, cluster_uuid: str | None = None, request_id: str | None = None, retry: AsyncRetry | _MethodDefault = DEFAULT, @@ -1372,7 +1373,7 @@ def __init__( region: str, job_name: str = "{{task.task_id}}_{{ds_nodash}}", cluster_name: str = "cluster-1", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, dataproc_properties: dict | None = None, dataproc_jars: list[str] | None = None, gcp_conn_id: str = "google_cloud_default", @@ -2135,7 +2136,7 @@ def __init__( *, template: dict, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -2229,7 +2230,7 @@ def __init__( *, template_id: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, version: int | None = None, request_id: str | None = None, parameters: dict[str, str] | None = None, @@ -2376,7 +2377,7 @@ def __init__( *, template: dict, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, request_id: str | None = None, retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -2513,7 +2514,7 @@ def __init__( *, job: dict, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, request_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -2682,7 +2683,7 @@ def __init__( graceful_decommission_timeout: dict | Duration, region: str, request_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -2816,7 +2817,7 @@ def __init__( *, region: str, cluster_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, tarball_gcs_dir: str | None = None, diagnosis_interval: dict | Interval | None = None, jobs: MutableSequence[str] | None = None, @@ -2954,7 +2955,7 @@ def __init__( self, *, region: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, batch: dict | Batch, batch_id: str, request_id: str | None = None, @@ -3146,7 +3147,7 @@ def __init__( *, batch_id: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -3210,7 +3211,7 @@ def __init__( *, batch_id: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -3285,7 +3286,7 @@ def __init__( self, *, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, page_token: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -3358,7 +3359,7 @@ def __init__( *, operation_name: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py index 8617426f79c41..f3ac9054f0b4c 100644 --- a/airflow/providers/google/cloud/operators/datastore.py +++ b/airflow/providers/google/cloud/operators/datastore.py @@ -29,6 +29,7 @@ CloudDatastoreImportExportLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers.google.common.links.storage import StorageLink if TYPE_CHECKING: @@ -90,7 +91,7 @@ def __init__( labels: dict | None = None, polling_interval_in_seconds: int = 10, overwrite_existing: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -194,7 +195,7 @@ def __init__( labels: dict | None = None, datastore_conn_id: str = "google_cloud_default", polling_interval_in_seconds: float = 10, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -268,7 +269,7 @@ def __init__( self, *, partial_keys: list, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -326,7 +327,7 @@ def __init__( self, *, transaction_options: dict[str, Any], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -384,7 +385,7 @@ def __init__( self, *, body: dict[str, Any], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -442,7 +443,7 @@ def __init__( self, *, transaction: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -498,7 +499,7 @@ def __init__( self, *, body: dict[str, Any], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/operators/dlp.py b/airflow/providers/google/cloud/operators/dlp.py index db705b5cd5042..9941574ae5561 100644 --- a/airflow/providers/google/cloud/operators/dlp.py +++ b/airflow/providers/google/cloud/operators/dlp.py @@ -59,6 +59,7 @@ CloudDLPPossibleInfoTypesListLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -108,7 +109,7 @@ def __init__( self, *, dlp_job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -195,7 +196,7 @@ def __init__( self, *, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, deidentify_template: dict | DeidentifyTemplate | None = None, template_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -304,7 +305,7 @@ class CloudDLPCreateDLPJobOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_job: dict | InspectJobConfig | None = None, risk_job: dict | RiskAnalysisJobConfig | None = None, job_id: str | None = None, @@ -416,7 +417,7 @@ def __init__( self, *, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_template: InspectTemplate | None = None, template_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -522,7 +523,7 @@ class CloudDLPCreateJobTriggerOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, job_trigger: dict | JobTrigger | None = None, trigger_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -631,7 +632,7 @@ def __init__( self, *, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, config: StoredInfoTypeConfig | None = None, stored_info_type_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -752,7 +753,7 @@ class CloudDLPDeidentifyContentOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, deidentify_config: dict | DeidentifyConfig | None = None, inspect_config: dict | InspectConfig | None = None, item: dict | ContentItem | None = None, @@ -842,7 +843,7 @@ def __init__( *, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -929,7 +930,7 @@ def __init__( self, *, dlp_job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1017,7 +1018,7 @@ def __init__( *, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1103,7 +1104,7 @@ def __init__( self, *, job_trigger_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1191,7 +1192,7 @@ def __init__( *, stored_info_type_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1281,7 +1282,7 @@ def __init__( *, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1364,7 +1365,7 @@ def __init__( self, *, dlp_job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1452,7 +1453,7 @@ def __init__( *, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1538,7 +1539,7 @@ def __init__( self, *, job_trigger_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1626,7 +1627,7 @@ def __init__( *, stored_info_type_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -1716,7 +1717,7 @@ class CloudDLPInspectContentOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_config: dict | InspectConfig | None = None, item: dict | ContentItem | None = None, inspect_template_name: str | None = None, @@ -1802,7 +1803,7 @@ def __init__( self, *, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1894,7 +1895,7 @@ class CloudDLPListDLPJobsOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, results_filter: str | None = None, page_size: int | None = None, job_type: str | None = None, @@ -1986,7 +1987,7 @@ class CloudDLPListInfoTypesOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, language_code: str | None = None, results_filter: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -2077,7 +2078,7 @@ def __init__( self, *, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -2168,7 +2169,7 @@ class CloudDLPListJobTriggersOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, results_filter: str | None = None, @@ -2263,7 +2264,7 @@ def __init__( self, *, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, page_size: int | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -2359,7 +2360,7 @@ class CloudDLPRedactImageOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_config: dict | InspectConfig | None = None, image_redaction_configs: None | (list[dict] | list[RedactImageRequest.ImageRedactionConfig]) = None, include_findings: bool | None = None, @@ -2453,7 +2454,7 @@ class CloudDLPReidentifyContentOperator(GoogleCloudBaseOperator): def __init__( self, *, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, reidentify_config: dict | DeidentifyConfig | None = None, inspect_config: dict | InspectConfig | None = None, item: dict | ContentItem | None = None, @@ -2548,7 +2549,7 @@ def __init__( *, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, deidentify_template: dict | DeidentifyTemplate | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -2648,7 +2649,7 @@ def __init__( *, template_id: str, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, inspect_template: dict | InspectTemplate | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -2744,7 +2745,7 @@ def __init__( self, *, job_trigger_id, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, job_trigger: dict | JobTrigger | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -2843,7 +2844,7 @@ def __init__( *, stored_info_type_id, organization_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, config: dict | StoredInfoTypeConfig | None = None, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/airflow/providers/google/cloud/operators/functions.py b/airflow/providers/google/cloud/operators/functions.py index b7836160666f6..589459003dc54 100644 --- a/airflow/providers/google/cloud/operators/functions.py +++ b/airflow/providers/google/cloud/operators/functions.py @@ -35,6 +35,7 @@ GcpBodyFieldValidator, GcpFieldValidationException, ) +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.version import version if TYPE_CHECKING: @@ -155,7 +156,7 @@ def __init__( *, location: str, body: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", zip_path: str | None = None, @@ -363,7 +364,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", impersonation_chain: str | Sequence[str] | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, **kwargs, ) -> None: self.name = name @@ -447,7 +448,7 @@ def __init__( function_id: str, input_data: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index f4c8ce932eb97..fd02a913e6e26 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -29,6 +29,8 @@ import pendulum +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID + if TYPE_CHECKING: from airflow.utils.context import Context @@ -119,7 +121,7 @@ def __init__( resource: dict | None = None, storage_class: str = "MULTI_REGIONAL", location: str = "US", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, labels: dict | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index f5d7a81f00fad..006e7b31b8b92 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -62,6 +62,7 @@ GKEOperationTrigger, GKEStartPodTrigger, ) +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers_manager import ProvidersManager from airflow.utils.timezone import utcnow @@ -177,7 +178,7 @@ def __init__( *, name: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v2", impersonation_chain: str | Sequence[str] | None = None, @@ -321,7 +322,7 @@ def __init__( *, location: str, body: dict | Cluster, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v2", impersonation_chain: str | Sequence[str] | None = None, @@ -507,7 +508,7 @@ def __init__( cluster_name: str, kueue_version: str, use_internal_ip: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -669,7 +670,7 @@ def __init__( location: str, cluster_name: str, use_internal_ip: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, regional: bool | None = None, @@ -867,7 +868,7 @@ def __init__( location: str, cluster_name: str, use_internal_ip: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), @@ -1004,7 +1005,7 @@ def __init__( location: str, namespace: str, cluster_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, use_internal_ip: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -1105,7 +1106,7 @@ def __init__( location: str, cluster_name: str, namespace: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, use_internal_ip: bool = False, do_xcom_push: bool = True, gcp_conn_id: str = "google_cloud_default", @@ -1205,7 +1206,7 @@ def __init__( location: str, cluster_name: str, use_internal_ip: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -1307,7 +1308,7 @@ def __init__( location: str, cluster_name: str, use_internal_ip: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -1446,7 +1447,7 @@ def __init__( location: str, cluster_name: str, use_internal_ip: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -1551,7 +1552,7 @@ def __init__( location: str, namespace: str, cluster_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, use_internal_ip: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -1655,7 +1656,7 @@ def __init__( location: str, namespace: str, cluster_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, use_internal_ip: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/operators/life_sciences.py b/airflow/providers/google/cloud/operators/life_sciences.py index ce8f36ccdeb1a..253f9bbf98c2e 100644 --- a/airflow/providers/google/cloud/operators/life_sciences.py +++ b/airflow/providers/google/cloud/operators/life_sciences.py @@ -27,6 +27,7 @@ from airflow.providers.google.cloud.hooks.life_sciences import LifeSciencesHook from airflow.providers.google.cloud.links.life_sciences import LifeSciencesLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -82,7 +83,7 @@ def __init__( *, body: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v2beta", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/operators/mlengine.py b/airflow/providers/google/cloud/operators/mlengine.py index d46a54c8b643a..5798653ed411a 100644 --- a/airflow/providers/google/cloud/operators/mlengine.py +++ b/airflow/providers/google/cloud/operators/mlengine.py @@ -39,6 +39,7 @@ ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.mlengine import MLEngineStartTrainingJobTrigger +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -345,7 +346,7 @@ def __init__( *, model: dict, operation: str = "create", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -417,7 +418,7 @@ def __init__( self, *, model: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -493,7 +494,7 @@ def __init__( self, *, model_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -573,7 +574,7 @@ def __init__( *, model_name: str, delete_contents: bool = False, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -680,7 +681,7 @@ def __init__( version_name: str | None = None, version: dict | None = None, operation: str = "create", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -773,7 +774,7 @@ def __init__( *, model_name: str, version: dict, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -866,7 +867,7 @@ def __init__( *, model_name: str, version_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -956,7 +957,7 @@ def __init__( self, *, model_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -1045,7 +1046,7 @@ def __init__( *, model_name: str, version_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -1467,7 +1468,7 @@ def __init__( self, *, job_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py index f076cb9ec0aae..41e1a03830d34 100644 --- a/airflow/providers/google/cloud/operators/pubsub.py +++ b/airflow/providers/google/cloud/operators/pubsub.py @@ -42,6 +42,7 @@ from airflow.providers.google.cloud.hooks.pubsub import PubSubHook from airflow.providers.google.cloud.links.pubsub import PubSubSubscriptionLink, PubSubTopicLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -124,7 +125,7 @@ def __init__( self, *, topic: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, fail_if_exists: bool = False, gcp_conn_id: str = "google_cloud_default", labels: dict[str, str] | None = None, @@ -310,7 +311,7 @@ def __init__( self, *, topic: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, subscription: str | None = None, subscription_project_id: str | None = None, ack_deadline_secs: int = 10, @@ -452,7 +453,7 @@ def __init__( self, *, topic: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, fail_if_not_exists: bool = False, gcp_conn_id: str = "google_cloud_default", retry: Retry | _MethodDefault = DEFAULT, @@ -552,7 +553,7 @@ def __init__( self, *, subscription: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, fail_if_not_exists: bool = False, gcp_conn_id: str = "google_cloud_default", retry: Retry | _MethodDefault = DEFAULT, @@ -654,7 +655,7 @@ def __init__( *, topic: str, messages: list, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/operators/spanner.py b/airflow/providers/google/cloud/operators/spanner.py index 8e287625e24e1..b6e25a369f610 100644 --- a/airflow/providers/google/cloud/operators/spanner.py +++ b/airflow/providers/google/cloud/operators/spanner.py @@ -25,6 +25,7 @@ from airflow.providers.google.cloud.hooks.spanner import SpannerHook from airflow.providers.google.cloud.links.spanner import SpannerDatabaseLink, SpannerInstanceLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from airflow.utils.context import Context @@ -79,7 +80,7 @@ def __init__( configuration_name: str, node_count: int, display_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -161,7 +162,7 @@ def __init__( self, *, instance_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -240,7 +241,7 @@ def __init__( instance_id: str, database_id: str, query: str | list[str], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -350,7 +351,7 @@ def __init__( instance_id: str, database_id: str, ddl_statements: list[str], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, @@ -456,7 +457,7 @@ def __init__( instance_id: str, database_id: str, ddl_statements: list[str], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, operation_id: str | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -550,7 +551,7 @@ def __init__( *, instance_id: str, database_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/operators/speech_to_text.py b/airflow/providers/google/cloud/operators/speech_to_text.py index bac79959f87d6..de26a8ba8216f 100644 --- a/airflow/providers/google/cloud/operators/speech_to_text.py +++ b/airflow/providers/google/cloud/operators/speech_to_text.py @@ -27,6 +27,7 @@ from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.speech_to_text import CloudSpeechToTextHook, RecognitionAudio from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers.google.common.links.storage import FileDetailsLink if TYPE_CHECKING: @@ -84,7 +85,7 @@ def __init__( *, audio: RecognitionAudio, config: RecognitionConfig, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py index 66fed54ebe004..ca64e3995c272 100644 --- a/airflow/providers/google/cloud/operators/stackdriver.py +++ b/airflow/providers/google/cloud/operators/stackdriver.py @@ -28,6 +28,7 @@ StackdriverPoliciesLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -99,7 +100,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -196,7 +197,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -279,7 +280,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -362,7 +363,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -441,7 +442,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -535,7 +536,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -632,7 +633,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -717,7 +718,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -804,7 +805,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -885,7 +886,7 @@ def __init__( timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: diff --git a/airflow/providers/google/cloud/operators/tasks.py b/airflow/providers/google/cloud/operators/tasks.py index 921610d2956cc..8678622b686e1 100644 --- a/airflow/providers/google/cloud/operators/tasks.py +++ b/airflow/providers/google/cloud/operators/tasks.py @@ -28,6 +28,7 @@ from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook from airflow.providers.google.cloud.links.cloud_tasks import CloudTasksLink, CloudTasksQueueLink from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -88,7 +89,7 @@ def __init__( *, location: str, task_queue: Queue, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, queue_name: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -195,7 +196,7 @@ def __init__( self, *, task_queue: Queue, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, queue_name: str | None = None, update_mask: FieldMask | None = None, @@ -285,7 +286,7 @@ def __init__( *, location: str, queue_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -368,7 +369,7 @@ def __init__( self, *, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, results_filter: str | None = None, page_size: int | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -453,7 +454,7 @@ def __init__( *, location: str, queue_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -530,7 +531,7 @@ def __init__( *, location: str, queue_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -613,7 +614,7 @@ def __init__( *, location: str, queue_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -696,7 +697,7 @@ def __init__( *, location: str, queue_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -788,7 +789,7 @@ def __init__( location: str, queue_name: str, task: dict | Task, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, task_name: str | None = None, response_view: Task.View | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -884,7 +885,7 @@ def __init__( location: str, queue_name: str, task_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, response_view: Task.View | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -976,7 +977,7 @@ def __init__( *, location: str, queue_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, response_view: Task.View | None = None, page_size: int | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -1067,7 +1068,7 @@ def __init__( location: str, queue_name: str, task_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -1151,7 +1152,7 @@ def __init__( location: str, queue_name: str, task_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, response_view: Task.View | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, diff --git a/airflow/providers/google/cloud/operators/text_to_speech.py b/airflow/providers/google/cloud/operators/text_to_speech.py index 7c7678f7bbe99..bae69d6598486 100644 --- a/airflow/providers/google/cloud/operators/text_to_speech.py +++ b/airflow/providers/google/cloud/operators/text_to_speech.py @@ -28,6 +28,7 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.hooks.text_to_speech import CloudTextToSpeechHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers.google.common.links.storage import FileDetailsLink if TYPE_CHECKING: @@ -94,7 +95,7 @@ def __init__( audio_config: dict | AudioConfig, target_bucket_name: str, target_filename: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, diff --git a/airflow/providers/google/cloud/operators/translate_speech.py b/airflow/providers/google/cloud/operators/translate_speech.py index b74f7a7131a76..fb3bdccb1abee 100644 --- a/airflow/providers/google/cloud/operators/translate_speech.py +++ b/airflow/providers/google/cloud/operators/translate_speech.py @@ -27,6 +27,7 @@ from airflow.providers.google.cloud.hooks.speech_to_text import CloudSpeechToTextHook from airflow.providers.google.cloud.hooks.translate import CloudTranslateHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers.google.common.links.storage import FileDetailsLink if TYPE_CHECKING: @@ -126,7 +127,7 @@ def __init__( format_: str, source_language: str | None, model: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/operators/vision.py b/airflow/providers/google/cloud/operators/vision.py index c7c8ef18c7e02..f016c8dce977f 100644 --- a/airflow/providers/google/cloud/operators/vision.py +++ b/airflow/providers/google/cloud/operators/vision.py @@ -34,6 +34,7 @@ from airflow.providers.google.cloud.hooks.vision import CloudVisionHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -94,7 +95,7 @@ def __init__( *, product_set: dict | ProductSet, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, product_set_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -181,7 +182,7 @@ def __init__( *, location: str, product_set_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -279,7 +280,7 @@ def __init__( product_set: dict | ProductSet, location: str | None = None, product_set_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -367,7 +368,7 @@ def __init__( *, location: str, product_set_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -455,7 +456,7 @@ def __init__( *, location: str, product: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, product_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -545,7 +546,7 @@ def __init__( *, location: str, product_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -652,7 +653,7 @@ def __init__( product: dict | Product, location: str | None = None, product_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, update_mask: dict | FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -743,7 +744,7 @@ def __init__( *, location: str, product_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -899,7 +900,7 @@ def __init__( reference_image: dict | ReferenceImage, product_id: str, reference_image_id: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -996,7 +997,7 @@ def __init__( location: str, product_id: str, reference_image_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -1085,7 +1086,7 @@ def __init__( product_set_id: str, product_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), @@ -1167,7 +1168,7 @@ def __init__( product_set_id: str, product_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: MetaData = (), diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py index 505c926008311..c42e0e27550fb 100644 --- a/airflow/providers/google/cloud/operators/workflows.py +++ b/airflow/providers/google/cloud/operators/workflows.py @@ -34,6 +34,7 @@ WorkflowsWorkflowDetailsLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.retry import Retry @@ -80,7 +81,7 @@ def __init__( workflow: dict, workflow_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -192,7 +193,7 @@ def __init__( *, workflow_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, update_mask: FieldMask | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -270,7 +271,7 @@ def __init__( *, workflow_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -332,7 +333,7 @@ def __init__( self, *, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, filter_: str | None = None, order_by: str | None = None, retry: Retry | _MethodDefault = DEFAULT, @@ -402,7 +403,7 @@ def __init__( *, workflow_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -473,7 +474,7 @@ def __init__( workflow_id: str, execution: dict, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -548,7 +549,7 @@ def __init__( workflow_id: str, execution_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -626,7 +627,7 @@ def __init__( workflow_id: str, location: str, start_date_filter: datetime.datetime | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -703,7 +704,7 @@ def __init__( workflow_id: str, execution_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), diff --git a/airflow/providers/google/cloud/secrets/secret_manager.py b/airflow/providers/google/cloud/secrets/secret_manager.py index c507fe3ef9472..df2a67b470582 100644 --- a/airflow/providers/google/cloud/secrets/secret_manager.py +++ b/airflow/providers/google/cloud/secrets/secret_manager.py @@ -30,6 +30,7 @@ _get_target_principal_and_delegates, get_credentials_and_project_id, ) +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.secrets import BaseSecretsBackend from airflow.utils.log.logging_mixin import LoggingMixin @@ -94,7 +95,7 @@ def __init__( gcp_key_path: str | None = None, gcp_credential_config_file: dict[str, str] | str | None = None, gcp_scopes: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, sep: str = "-", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/sensors/bigquery_dts.py b/airflow/providers/google/cloud/sensors/bigquery_dts.py index 52095c198024d..99d7acb1b7f32 100644 --- a/airflow/providers/google/cloud/sensors/bigquery_dts.py +++ b/airflow/providers/google/cloud/sensors/bigquery_dts.py @@ -26,6 +26,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.google.cloud.hooks.bigquery_dts import BiqQueryDataTransferServiceHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -83,7 +84,7 @@ def __init__( expected_statuses: ( set[str | TransferState | int] | str | TransferState | int ) = TransferState.SUCCEEDED, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", retry: Retry | _MethodDefault = DEFAULT, request_timeout: float | None = None, diff --git a/airflow/providers/google/cloud/sensors/bigtable.py b/airflow/providers/google/cloud/sensors/bigtable.py index 57f825c005fbf..f9cd61b9adfac 100644 --- a/airflow/providers/google/cloud/sensors/bigtable.py +++ b/airflow/providers/google/cloud/sensors/bigtable.py @@ -28,6 +28,7 @@ from airflow.providers.google.cloud.hooks.bigtable import BigtableHook from airflow.providers.google.cloud.links.bigtable import BigtableTablesLink from airflow.providers.google.cloud.operators.bigtable import BigtableValidationMixin +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -74,7 +75,7 @@ def __init__( *, instance_id: str, table_id: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py index 639a5775a8067..ca5ac0087414a 100644 --- a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py @@ -28,6 +28,7 @@ CloudDataTransferServiceHook, ) from airflow.providers.google.cloud.links.cloud_storage_transfer import CloudStorageTransferJobLink +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -73,7 +74,7 @@ def __init__( *, job_name: str, expected_statuses: set[str] | str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/sensors/dataflow.py b/airflow/providers/google/cloud/sensors/dataflow.py index b397d56e2ebf4..c5d9efc74758e 100644 --- a/airflow/providers/google/cloud/sensors/dataflow.py +++ b/airflow/providers/google/cloud/sensors/dataflow.py @@ -35,6 +35,7 @@ DataflowJobMetricsTrigger, DataflowJobStatusTrigger, ) +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -77,7 +78,7 @@ def __init__( *, job_id: str, expected_statuses: set[str] | str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -206,7 +207,7 @@ def __init__( job_id: str, callback: Callable | None = None, fail_on_terminal_state: bool = True, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -333,7 +334,7 @@ def __init__( job_id: str, callback: Callable | None = None, fail_on_terminal_state: bool = True, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -461,7 +462,7 @@ def __init__( job_id: str, callback: Callable | None = None, fail_on_terminal_state: bool = True, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, location: str = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/sensors/datafusion.py b/airflow/providers/google/cloud/sensors/datafusion.py index 47b9e529fcabc..358906859c534 100644 --- a/airflow/providers/google/cloud/sensors/datafusion.py +++ b/airflow/providers/google/cloud/sensors/datafusion.py @@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException, AirflowSkipException from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -65,7 +66,7 @@ def __init__( instance_name: str, location: str, failure_statuses: Iterable[str] | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, namespace: str = "default", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/sensors/dataproc.py b/airflow/providers/google/cloud/sensors/dataproc.py index 5ae159558a9ad..fad70b1c28e30 100644 --- a/airflow/providers/google/cloud/sensors/dataproc.py +++ b/airflow/providers/google/cloud/sensors/dataproc.py @@ -27,6 +27,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.google.cloud.hooks.dataproc import DataprocHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -53,7 +54,7 @@ def __init__( *, dataproc_job_id: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", wait_timeout: int | None = None, **kwargs, @@ -144,7 +145,7 @@ def __init__( *, batch_id: str, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", wait_timeout: int | None = None, **kwargs, diff --git a/airflow/providers/google/cloud/sensors/tasks.py b/airflow/providers/google/cloud/sensors/tasks.py index 059ddf75bb8fd..b5631b6b8ede3 100644 --- a/airflow/providers/google/cloud/sensors/tasks.py +++ b/airflow/providers/google/cloud/sensors/tasks.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Sequence from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -57,7 +58,7 @@ def __init__( self, *, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, queue_name: str | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py index 7f97fafdbb9f7..d1aa1b7696c6f 100644 --- a/airflow/providers/google/cloud/sensors/workflows.py +++ b/airflow/providers/google/cloud/sensors/workflows.py @@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -58,7 +59,7 @@ def __init__( workflow_id: str, execution_id: str, location: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, success_states: set[Execution.State] | None = None, failure_states: set[Execution.State] | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 8167558ed0bd1..66a8e16eb2b46 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -30,6 +30,7 @@ from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink from airflow.providers.google.cloud.triggers.bigquery import BigQueryInsertJobTrigger +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils.helpers import merge_dicts if TYPE_CHECKING: @@ -104,7 +105,7 @@ def __init__( *, source_project_dataset_table: str, destination_cloud_storage_uris: list[str], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, compression: str = "NONE", export_format: str = "CSV", field_delimiter: str = ",", diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 3899048dc419f..6237a03487fca 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -42,6 +42,7 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink from airflow.providers.google.cloud.triggers.bigquery import BigQueryInsertJobTrigger +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.utils.helpers import merge_dicts if TYPE_CHECKING: @@ -229,7 +230,7 @@ def __init__( job_id: str | None = None, force_rerun: bool = True, reattach_states: set[str] | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, **kwargs, ) -> None: super().__init__(**kwargs) diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py index fd0170526199d..7b07ddec81de0 100644 --- a/airflow/providers/google/cloud/triggers/bigquery.py +++ b/airflow/providers/google/cloud/triggers/bigquery.py @@ -51,7 +51,7 @@ def __init__( self, conn_id: str, job_id: str | None, - project_id: str | None, + project_id: str, location: str | None, dataset_id: str | None = None, table_id: str | None = None, @@ -293,7 +293,7 @@ def __init__( conn_id: str, first_job_id: str, second_job_id: str, - project_id: str | None, + project_id: str, table: str, metrics_thresholds: dict[str, int], location: str | None = None, @@ -454,7 +454,7 @@ def __init__( sql: str, pass_value: int | float | str, job_id: str | None, - project_id: str | None, + project_id: str, tolerance: Any = None, dataset_id: str | None = None, table_id: str | None = None, diff --git a/airflow/providers/google/cloud/triggers/cloud_sql.py b/airflow/providers/google/cloud/triggers/cloud_sql.py index 62043ee0e6fdb..98dcc54650c3b 100644 --- a/airflow/providers/google/cloud/triggers/cloud_sql.py +++ b/airflow/providers/google/cloud/triggers/cloud_sql.py @@ -23,6 +23,7 @@ from typing import Sequence from airflow.providers.google.cloud.hooks.cloud_sql import CloudSQLAsyncHook, CloudSqlOperationStatus +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -36,7 +37,7 @@ class CloudSQLExportTrigger(BaseTrigger): def __init__( self, operation_name: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, poke_interval: int = 20, diff --git a/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py index f5ab93242e0c9..90f3ad1a33bee 100644 --- a/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py @@ -27,6 +27,7 @@ from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import ( CloudDataTransferServiceAsyncHook, ) +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -43,7 +44,7 @@ class CloudStorageTransferServiceCreateJobsTrigger(BaseTrigger): def __init__( self, job_names: list[str], - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, poll_interval: int = 10, gcp_conn_id: str = "google_cloud_default", ) -> None: diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py index b26157e8b2582..f0aecddb4a8ed 100644 --- a/airflow/providers/google/cloud/triggers/dataproc.py +++ b/airflow/providers/google/cloud/triggers/dataproc.py @@ -29,6 +29,7 @@ from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook from airflow.providers.google.cloud.utils.dataproc import DataprocOperationType +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -38,7 +39,7 @@ class DataprocBaseTrigger(BaseTrigger): def __init__( self, region: str, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, polling_interval_seconds: int = 30, diff --git a/airflow/providers/google/cloud/triggers/mlengine.py b/airflow/providers/google/cloud/triggers/mlengine.py index 87fb1f57353a7..b4ae37c681939 100644 --- a/airflow/providers/google/cloud/triggers/mlengine.py +++ b/airflow/providers/google/cloud/triggers/mlengine.py @@ -20,6 +20,7 @@ from typing import Any, AsyncIterator, Sequence from airflow.providers.google.cloud.hooks.mlengine import MLEngineAsyncHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -45,7 +46,7 @@ def __init__( runtime_version: str | None = None, python_version: str | None = None, job_dir: str | None = None, - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, labels: dict[str, str] | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index ca08f86e78d7f..0fe5d16aae170 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -364,14 +364,14 @@ def _get_field(self, f: str, default: Any = None) -> Any: return hasattr(self, "extras") and get_field(self.extras, f) or default @property - def project_id(self) -> str | None: + def project_id(self) -> str: """ Returns project id. :return: id of the project """ _, project_id = self.get_credentials_and_project_id() - return project_id + return project_id or PROVIDE_PROJECT_ID @property def num_retries(self) -> int: diff --git a/airflow/providers/google/firebase/hooks/firestore.py b/airflow/providers/google/firebase/hooks/firestore.py index 5804081e29ace..5bcff6239d693 100644 --- a/airflow/providers/google/firebase/hooks/firestore.py +++ b/airflow/providers/google/firebase/hooks/firestore.py @@ -25,7 +25,7 @@ from googleapiclient.discovery import build, build_from_document from airflow.exceptions import AirflowException -from airflow.providers.google.common.hooks.base_google import GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook # Time to sleep between active checks of the operation results TIME_TO_SLEEP_IN_SECONDS = 5 @@ -84,7 +84,7 @@ def get_conn(self): @GoogleBaseHook.fallback_to_default_project_id def export_documents( - self, body: dict, database_id: str = "(default)", project_id: str | None = None + self, body: dict, database_id: str = "(default)", project_id: str = PROVIDE_PROJECT_ID ) -> None: """ Start a export with the specified configuration. diff --git a/airflow/providers/google/firebase/operators/firestore.py b/airflow/providers/google/firebase/operators/firestore.py index d14242e6b0461..d3a3c97f11017 100644 --- a/airflow/providers/google/firebase/operators/firestore.py +++ b/airflow/providers/google/firebase/operators/firestore.py @@ -20,6 +20,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator +from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID from airflow.providers.google.firebase.hooks.firestore import CloudFirestoreHook if TYPE_CHECKING: @@ -64,7 +65,7 @@ def __init__( *, body: dict, database_id: str = "(default)", - project_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", impersonation_chain: str | Sequence[str] | None = None,