From 0f25e9a59de50de6313ea2b9753777463df93aa8 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Wed, 14 Aug 2024 14:47:25 +0000 Subject: [PATCH] Refactor DataprocCreateBatchOperator and Dataproc system tests --- .../google/cloud/operators/dataproc.py | 154 ++++++++---------- .../google/cloud/triggers/dataproc.py | 5 +- scripts/ci/pre_commit/check_system_tests.py | 3 +- .../cloud/dataproc/example_dataproc_batch.py | 26 ++- .../example_dataproc_batch_deferrable.py | 3 + .../example_dataproc_batch_persistent.py | 4 + ...cluster_create_existing_stopped_cluster.py | 4 + .../example_dataproc_cluster_deferrable.py | 4 + .../example_dataproc_cluster_diagnose.py | 3 + .../example_dataproc_cluster_generator.py | 3 + .../example_dataproc_cluster_start_stop.py | 3 + .../example_dataproc_cluster_update.py | 4 + .../cloud/dataproc/example_dataproc_gke.py | 4 + .../cloud/dataproc/example_dataproc_hadoop.py | 3 + .../cloud/dataproc/example_dataproc_hive.py | 3 + .../cloud/dataproc/example_dataproc_pig.py | 3 + .../cloud/dataproc/example_dataproc_presto.py | 3 + .../dataproc/example_dataproc_pyspark.py | 3 + .../cloud/dataproc/example_dataproc_spark.py | 3 + .../dataproc/example_dataproc_spark_async.py | 3 + .../example_dataproc_spark_deferrable.py | 3 + .../dataproc/example_dataproc_spark_sql.py | 3 + .../cloud/dataproc/example_dataproc_sparkr.py | 3 + .../cloud/dataproc/example_dataproc_trino.py | 3 + 24 files changed, 148 insertions(+), 105 deletions(-) diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index efaf0d6326d08..26bf6086ad419 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -30,6 +30,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum +from functools import cached_property from typing import TYPE_CHECKING, Any, Sequence from deprecated import deprecated @@ -2985,10 +2986,10 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator): def __init__( self, *, - region: str | None = None, + region: str, project_id: str = PROVIDE_PROJECT_ID, batch: dict | Batch, - batch_id: str, + batch_id: str | None = None, request_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -3021,20 +3022,20 @@ def __init__( self.polling_interval_seconds = polling_interval_seconds def execute(self, context: Context): - hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - # batch_id might not be set and will be generated - if self.batch_id: - link = DATAPROC_BATCH_LINK.format( - region=self.region, project_id=self.project_id, batch_id=self.batch_id + if self.asynchronous and self.deferrable: + raise AirflowException( + "Both asynchronous and deferrable parameters were passed. Please, provide only one." ) - self.log.info("Creating batch %s", self.batch_id) - self.log.info("Once started, the batch job will be available at %s", link) + + batch_id: str = "" + if self.batch_id: + batch_id = self.batch_id + self.log.info("Starting batch %s", batch_id) else: - self.log.info("Starting batch job. The batch ID will be generated since it was not provided.") - if self.region is None: - raise AirflowException("Region should be set here") + self.log.info("Starting batch. The batch ID will be generated since it was not provided.") + try: - self.operation = hook.create_batch( + self.operation = self.hook.create_batch( region=self.region, project_id=self.project_id, batch=self.batch, @@ -3044,85 +3045,62 @@ def execute(self, context: Context): timeout=self.timeout, metadata=self.metadata, ) - if self.operation is None: - raise RuntimeError("The operation should be set here!") - - if not self.deferrable: - if not self.asynchronous: - result = hook.wait_for_operation( - timeout=self.timeout, result_retry=self.result_retry, operation=self.operation - ) - self.log.info("Batch %s created", self.batch_id) - - else: - DataprocBatchLink.persist( - context=context, - operator=self, - project_id=self.project_id, - region=self.region, - batch_id=self.batch_id, - ) - return self.operation.operation.name - - else: - # processing ends in execute_complete - self.defer( - trigger=DataprocBatchTrigger( - batch_id=self.batch_id, - project_id=self.project_id, - region=self.region, - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, - polling_interval_seconds=self.polling_interval_seconds, - ), - method_name="execute_complete", - ) - except AlreadyExists: - self.log.info("Batch with given id already exists") - # This is only likely to happen if batch_id was provided - # Could be running if Airflow was restarted after task started - # poll until a final state is reached - - self.log.info("Attaching to the job %s if it is still running.", self.batch_id) + self.log.info("Batch with given id already exists.") + self.log.info("Attaching to the job %s if it is still running.", batch_id) + else: + batch_id = self.operation.metadata.batch.split("/")[-1] + self.log.info("The batch %s was created.", batch_id) - # deferrable handling of a batch_id that already exists - processing ends in execute_complete - if self.deferrable: - self.defer( - trigger=DataprocBatchTrigger( - batch_id=self.batch_id, - project_id=self.project_id, - region=self.region, - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, - polling_interval_seconds=self.polling_interval_seconds, - ), - method_name="execute_complete", - ) + DataprocBatchLink.persist( + context=context, + operator=self, + project_id=self.project_id, + region=self.region, + batch_id=batch_id, + ) - # non-deferrable handling of a batch_id that already exists - result = hook.wait_for_batch( - batch_id=self.batch_id, + if self.asynchronous: + batch = self.hook.get_batch( + batch_id=batch_id, region=self.region, project_id=self.project_id, retry=self.retry, timeout=self.timeout, metadata=self.metadata, - wait_check_interval=self.polling_interval_seconds, ) - batch_id = self.batch_id or result.name.split("/")[-1] + self.log.info("The batch %s was created asynchronously. Exiting.", batch_id) + return Batch.to_dict(batch) - self.handle_batch_status(context, result.state, batch_id) - project_id = self.project_id or hook.project_id - if project_id: - DataprocBatchLink.persist( - context=context, - operator=self, - project_id=project_id, - region=self.region, - batch_id=batch_id, + if self.deferrable: + self.defer( + trigger=DataprocBatchTrigger( + batch_id=batch_id, + project_id=self.project_id, + region=self.region, + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + polling_interval_seconds=self.polling_interval_seconds, + ), + method_name="execute_complete", ) - return Batch.to_dict(result) + + self.log.info("Waiting for the completion of batch job %s", batch_id) + batch = self.hook.wait_for_batch( + batch_id=batch_id, + region=self.region, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + self.handle_batch_status(context, batch.state, batch_id, batch.state_message) + return Batch.to_dict(batch) + + @cached_property + def hook(self) -> DataprocHook: + return DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) def execute_complete(self, context, event=None) -> None: """ @@ -3135,23 +3113,27 @@ def execute_complete(self, context, event=None) -> None: raise AirflowException("Batch failed.") state = event["batch_state"] batch_id = event["batch_id"] - self.handle_batch_status(context, state, batch_id) + self.handle_batch_status(context, state, batch_id, state_message=event["batch_state_message"]) def on_kill(self): if self.operation: self.operation.cancel() - def handle_batch_status(self, context: Context, state: Batch.State, batch_id: str) -> None: + def handle_batch_status( + self, context: Context, state: Batch.State, batch_id: str, state_message: str | None = None + ) -> None: # The existing batch may be a number of states other than 'SUCCEEDED'\ # wait_for_operation doesn't fail if the job is cancelled, so we will check for it here which also # finds a cancelling|canceled|unspecified job from wait_for_batch or the deferred trigger link = DATAPROC_BATCH_LINK.format(region=self.region, project_id=self.project_id, batch_id=batch_id) if state == Batch.State.FAILED: - raise AirflowException("Batch job %s failed. Driver Logs: %s", batch_id, link) + raise AirflowException( + f"Batch job {batch_id} failed with error: {state_message}\nDriver Logs: {link}" + ) if state in (Batch.State.CANCELLED, Batch.State.CANCELLING): - raise AirflowException("Batch job %s was cancelled. Driver logs: %s", batch_id, link) + raise AirflowException(f"Batch job {batch_id} was cancelled. Driver logs: {link}") if state == Batch.State.STATE_UNSPECIFIED: - raise AirflowException("Batch job %s unspecified. Driver logs: %s", batch_id, link) + raise AirflowException(f"Batch job {batch_id} unspecified. Driver logs: {link}") self.log.info("Batch job %s completed. Driver logs: %s", batch_id, link) diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py index 99800d266a86a..508b0444c05aa 100644 --- a/airflow/providers/google/cloud/triggers/dataproc.py +++ b/airflow/providers/google/cloud/triggers/dataproc.py @@ -371,7 +371,10 @@ async def run(self): self.log.info("Current state is %s", state) self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds) await asyncio.sleep(self.polling_interval_seconds) - yield TriggerEvent({"batch_id": self.batch_id, "batch_state": state}) + + yield TriggerEvent( + {"batch_id": self.batch_id, "batch_state": state, "batch_state_message": batch.state_message} + ) class DataprocDeleteClusterTrigger(DataprocBaseTrigger): diff --git a/scripts/ci/pre_commit/check_system_tests.py b/scripts/ci/pre_commit/check_system_tests.py index 89e2a9f24ae5c..4c82272ad7875 100755 --- a/scripts/ci/pre_commit/check_system_tests.py +++ b/scripts/ci/pre_commit/check_system_tests.py @@ -35,7 +35,6 @@ errors: list[str] = [] WATCHER_APPEND_INSTRUCTION = "list(dag.tasks) >> watcher()" -WATCHER_APPEND_INSTRUCTION_SHORT = " >> watcher()" PYTEST_FUNCTION = """ from tests.system.utils import get_test_run # noqa: E402 @@ -53,7 +52,7 @@ def _check_file(file: Path): content = file.read_text() if "from tests.system.utils.watcher import watcher" in content: - index = content.find(WATCHER_APPEND_INSTRUCTION_SHORT) + index = content.find(WATCHER_APPEND_INSTRUCTION) if index == -1: errors.append( f"[red]The example {file} imports tests.system.utils.watcher " diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py index 852fa0914d089..2796b16d6be62 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py @@ -69,6 +69,7 @@ region=REGION, batch=BATCH_CONFIG, batch_id=BATCH_ID, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) create_batch_2 = DataprocCreateBatchOperator( @@ -87,6 +88,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID_3, asynchronous=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_batch_operator] @@ -128,18 +130,10 @@ task_id="cancel_operation", project_id=PROJECT_ID, region=REGION, - operation_name="{{ task_instance.xcom_pull('create_batch_4') }}", + operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}", ) # [END how_to_cloud_dataproc_cancel_operation_operator] - batch_cancelled_sensor = DataprocBatchSensor( - task_id="batch_cancelled_sensor", - region=REGION, - project_id=PROJECT_ID, - batch_id=BATCH_ID_4, - poke_interval=10, - ) - # [START how_to_cloud_dataproc_delete_batch_operator] delete_batch = DataprocDeleteBatchOperator( task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID @@ -161,7 +155,9 @@ ( # TEST SETUP - [create_batch, create_batch_2, create_batch_3] + create_batch + >> create_batch_2 + >> create_batch_3 # TEST BODY >> batch_async_sensor >> get_batch @@ -169,8 +165,9 @@ >> create_batch_4 >> cancel_operation # TEST TEARDOWN - >> [delete_batch, delete_batch_2, delete_batch_3] - >> batch_cancelled_sensor + >> delete_batch + >> delete_batch_2 + >> delete_batch_3 >> delete_batch_4 ) @@ -178,10 +175,7 @@ # This test needs watcher in order to properly mark success/failure # when "teardown" task with trigger rule is part of the DAG - - # Excluding sensor because we expect it to fail due to cancelled operation - [task for task in dag.tasks if task.task_id != "batch_cancelled_sensor"] >> watcher() - + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py index 1d2fec951e18f..afd509258d597 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py @@ -25,6 +25,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateBatchOperator, @@ -62,6 +64,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID, deferrable=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_batch_operator_async] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py index cb4e731c6785b..153033228ebcf 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py @@ -23,6 +23,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( ClusterGenerator, @@ -89,6 +91,7 @@ cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server] @@ -99,6 +102,7 @@ region=REGION, batch=BATCH_CONFIG_WITH_PHS, batch_id=BATCH_ID, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py index 10403cea3b065..daad1b05b9450 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -69,6 +71,7 @@ region=REGION, cluster_name=CLUSTER_NAME, use_if_exists=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) start_cluster = DataprocStartClusterOperator( @@ -76,6 +79,7 @@ project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) stop_cluster = DataprocStopClusterOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py index 35adca660d9a0..35e47ed583286 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -95,6 +97,7 @@ region=REGION, cluster_name=CLUSTER_NAME, deferrable=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_operator_async] @@ -108,6 +111,7 @@ project_id=PROJECT_ID, region=REGION, deferrable=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_update_cluster_operator_async] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py index a610049bf99e3..2bfda208ca22b 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -72,6 +74,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_diagnose_cluster] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py index d64fda25e4568..9ab7776547ef4 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py @@ -25,6 +25,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( ClusterGenerator, @@ -103,6 +105,7 @@ project_id=PROJECT_ID, region=REGION, cluster_config=CLUSTER_GENERATOR_CONFIG, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_generate_cluster_config_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py index 2af1352fdc1de..09e41f34f88c8 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -68,6 +70,7 @@ region=REGION, cluster_name=CLUSTER_NAME, use_if_exists=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_start_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py index 610a19e17f84f..60114b537e146 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -83,6 +85,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_update_cluster_operator] @@ -94,6 +97,7 @@ graceful_decommission_timeout=TIMEOUT, project_id=PROJECT_ID, region=REGION, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_update_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py index 550dec97c4187..2f34e9635d3b3 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py @@ -31,6 +31,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -101,6 +103,7 @@ project_id=PROJECT_ID, location=REGION, body=GKE_CLUSTER_CONFIG, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_create_cluster_operator_in_gke] @@ -110,6 +113,7 @@ region=REGION, cluster_name=CLUSTER_NAME, virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_operator_in_gke] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py index f8f5cc3063736..a372c861e297c 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -91,6 +93,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) hadoop_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py index 5e1189c7f9125..31f5d85218579 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -93,6 +95,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py index 9b76c10cbe346..771a5b57d3943 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -80,6 +82,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) pig_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py index 1c3cdf208252e..2ad14e84553f3 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -87,6 +89,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) presto_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py index 4c6a64783a425..2c659a5d9a2f6 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -103,6 +105,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_submit_job_to_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py index f939347baac4b..0a8a8bc8adcaf 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -83,6 +85,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) spark_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py index cd1060b5fea57..d862a2f890397 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -82,6 +84,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START cloud_dataproc_async_submit_sensor] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py index d20aa0aa0ed51..9742daae93f15 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py @@ -25,6 +25,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -84,6 +86,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) spark_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py index 0ca0d062d2989..87577809e8d29 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -80,6 +82,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) spark_sql_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py index 50454904759a6..756301d4ae75c 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -101,6 +103,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) sparkr_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py index 936430ccf0f12..976b567eb3dda 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -89,6 +91,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) trino_task = DataprocSubmitJobOperator(