From 185c20677abfbb4ace5cdd48cf40fea762c4f02c Mon Sep 17 00:00:00 2001 From: Maksim Yermakou Date: Tue, 1 Oct 2024 16:13:18 +0000 Subject: [PATCH] Add 'retry_if_resource_not_ready' logic for DataprocCreateClusterOperator and DataprocCreateBatchOperator --- .../providers/google/cloud/hooks/dataproc.py | 11 ++ .../google/cloud/operators/dataproc.py | 130 ++++++++++++++++-- .../cloud/dataproc/example_dataproc_batch.py | 4 + .../example_dataproc_batch_deferrable.py | 1 + .../example_dataproc_batch_persistent.py | 2 + ...cluster_create_existing_stopped_cluster.py | 2 + .../example_dataproc_cluster_deferrable.py | 1 + .../example_dataproc_cluster_diagnose.py | 1 + .../example_dataproc_cluster_generator.py | 1 + .../example_dataproc_cluster_start_stop.py | 1 + .../example_dataproc_cluster_update.py | 1 + .../cloud/dataproc/example_dataproc_flink.py | 1 + .../cloud/dataproc/example_dataproc_gke.py | 1 + .../cloud/dataproc/example_dataproc_hadoop.py | 1 + .../cloud/dataproc/example_dataproc_hive.py | 1 + .../cloud/dataproc/example_dataproc_pig.py | 1 + .../cloud/dataproc/example_dataproc_presto.py | 1 + .../dataproc/example_dataproc_pyspark.py | 1 + .../cloud/dataproc/example_dataproc_spark.py | 1 + .../dataproc/example_dataproc_spark_async.py | 1 + .../example_dataproc_spark_deferrable.py | 1 + .../dataproc/example_dataproc_spark_sql.py | 1 + .../cloud/dataproc/example_dataproc_sparkr.py | 1 + .../cloud/dataproc/example_dataproc_trino.py | 1 + 24 files changed, 153 insertions(+), 15 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index 2eb8d8952c56e..fc787ea83f7a0 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -59,6 +59,10 @@ from google.type.interval_pb2 import Interval +class DataprocResourceIsNotReadyError(AirflowException): + """Raise when resource is not ready for create Dataproc cluster.""" + + class DataProcJobBuilder: """A helper class for building Dataproc job.""" @@ -281,6 +285,8 @@ def wait_for_operation( return operation.result(timeout=timeout, retry=result_retry) except Exception: error = operation.exception(timeout=timeout) + if self.check_error_for_resource_is_not_ready_msg(error.message): + raise DataprocResourceIsNotReadyError(error.message) raise AirflowException(error) @GoogleBaseHook.fallback_to_default_project_id @@ -1192,6 +1198,11 @@ def wait_for_batch( return result + def check_error_for_resource_is_not_ready_msg(self, error_msg: str) -> bool: + """Check that reason of error is resource is not ready.""" + key_words = ["The resource", "is not ready"] + return all([word in error_msg for word in key_words]) + class DataprocAsyncHook(GoogleBaseHook): """ diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 38a364036c0e2..2c9c866537a73 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -40,7 +40,11 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning -from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder +from airflow.providers.google.cloud.hooks.dataproc import ( + DataprocHook, + DataProcJobBuilder, + DataprocResourceIsNotReadyError, +) from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.links.dataproc import ( DATAPROC_BATCH_LINK, @@ -593,6 +597,8 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator): :param delete_on_error: If true the cluster will be deleted if created with ERROR state. Default value is true. :param use_if_exists: If true use existing cluster + :param num_retries_if_resource_is_not_ready: Optional. The number of retry for cluster creation request + when resource is not ready error appears. :param request_id: Optional. A unique id used to identify the request. If the server receives two ``DeleteClusterRequest`` requests with the same id, then the second request will be ignored and the first ``google.longrunning.Operation`` created and stored in the backend is returned. @@ -639,6 +645,7 @@ def __init__( request_id: str | None = None, delete_on_error: bool = True, use_if_exists: bool = True, + num_retries_if_resource_is_not_ready: int = 0, retry: AsyncRetry | _MethodDefault | Retry = DEFAULT, timeout: float = 1 * 60 * 60, metadata: Sequence[tuple[str, str]] = (), @@ -695,6 +702,7 @@ def __init__( self.virtual_cluster_config = virtual_cluster_config self.deferrable = deferrable self.polling_interval_seconds = polling_interval_seconds + self.num_retries_if_resource_is_not_ready = num_retries_if_resource_is_not_ready def _create_cluster(self, hook: DataprocHook): return hook.create_cluster( @@ -729,20 +737,26 @@ def _handle_error_state(self, hook: DataprocHook, cluster: Cluster) -> None: return self.log.info("Cluster is in ERROR state") self.log.info("Gathering diagnostic information.") - operation = hook.diagnose_cluster( - region=self.region, cluster_name=self.cluster_name, project_id=self.project_id - ) - operation.result() - gcs_uri = str(operation.operation.response.value) - self.log.info("Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri) - - if self.delete_on_error: - self._delete_cluster(hook) - # The delete op is asynchronous and can cause further failure if the cluster finishes - # deleting between catching AlreadyExists and checking state - self._wait_for_cluster_in_deleting_state(hook) - raise AirflowException("Cluster was created in an ERROR state then deleted.") - raise AirflowException("Cluster was created but is in ERROR state") + try: + operation = hook.diagnose_cluster( + region=self.region, cluster_name=self.cluster_name, project_id=self.project_id + ) + operation.result() + gcs_uri = str(operation.operation.response.value) + self.log.info( + "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri + ) + except Exception as diagnose_error: + self.log.info("Some error occurred when trying to diagnose cluster.") + self.log.exception(diagnose_error) + finally: + if self.delete_on_error: + self._delete_cluster(hook) + # The delete op is asynchronous and can cause further failure if the cluster finishes + # deleting between catching AlreadyExists and checking state + self._wait_for_cluster_in_deleting_state(hook) + raise AirflowException("Cluster was created in an ERROR state then deleted.") + raise AirflowException("Cluster was created but is in ERROR state") def _wait_for_cluster_in_deleting_state(self, hook: DataprocHook) -> None: time_left = self.timeout @@ -780,6 +794,16 @@ def _start_cluster(self, hook: DataprocHook): ) return hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=op) + def _retry_cluster_creation(self, hook: DataprocHook): + self.log.info("Retrying creation process for Cluster %s", self.cluster_name) + self._delete_cluster(hook) + self._wait_for_cluster_in_deleting_state(hook) + self.log.info("Starting a new creation for Cluster %s", self.cluster_name) + operation = self._create_cluster(hook) + cluster = hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation) + self.log.info("Cluster created.") + return Cluster.to_dict(cluster) + def execute(self, context: Context) -> dict: self.log.info("Creating cluster: %s", self.cluster_name) hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) @@ -829,6 +853,25 @@ def execute(self, context: Context) -> dict: raise self.log.info("Cluster already exists.") cluster = self._get_cluster(hook) + except DataprocResourceIsNotReadyError as resource_not_ready_error: + if self.num_retries_if_resource_is_not_ready: + attempt = self.num_retries_if_resource_is_not_ready + while attempt > 0: + attempt -= 1 + try: + cluster = self._retry_cluster_creation(hook) + except DataprocResourceIsNotReadyError: + continue + else: + return cluster + self.log.info( + "Retrying Cluster %s creation because of resource not ready was unsuccessful.", + self.cluster_name, + ) + if self.delete_on_error: + self._delete_cluster(hook) + self._wait_for_cluster_in_deleting_state(hook) + raise resource_not_ready_error except AirflowException as ae: # There still could be a cluster created here in an ERROR state which # should be deleted immediately rather than consuming another retry attempt @@ -2948,6 +2991,8 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator): :param request_id: Optional. A unique id used to identify the request. If the server receives two ``CreateBatchRequest`` requests with the same id, then the second request will be ignored and the first ``google.longrunning.Operation`` created and stored in the backend is returned. + :param num_retries_if_resource_is_not_ready: Optional. The number of retry for cluster creation request + when resource is not ready error appears. :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be retried. :param result_retry: Result retry object used to retry requests. Is used to decrease delay between @@ -2988,6 +3033,7 @@ def __init__( batch: dict | Batch, batch_id: str | None = None, request_id: str | None = None, + num_retries_if_resource_is_not_ready: int = 0, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -3007,6 +3053,7 @@ def __init__( self.batch = batch self.batch_id = batch_id self.request_id = request_id + self.num_retries_if_resource_is_not_ready = num_retries_if_resource_is_not_ready self.retry = retry self.result_retry = result_retry self.timeout = timeout @@ -3091,6 +3138,15 @@ def execute(self, context: Context): timeout=self.timeout, metadata=self.metadata, ) + if self.num_retries_if_resource_is_not_ready and self.hook.check_error_for_resource_is_not_ready_msg( + batch.state_message + ): + attempt = self.num_retries_if_resource_is_not_ready + while attempt > 0: + attempt -= 1 + batch, batch_id = self.retry_batch_creation(batch_id) + if not self.hook.check_error_for_resource_is_not_ready_msg(batch.state_message): + break self.handle_batch_status(context, batch.state, batch_id, batch.state_message) return Batch.to_dict(batch) @@ -3133,6 +3189,50 @@ def handle_batch_status( raise AirflowException(f"Batch job {batch_id} unspecified. Driver logs: {link}") self.log.info("Batch job %s completed. Driver logs: %s", batch_id, link) + def retry_batch_creation( + self, + previous_batch_id: str, + ): + self.log.info("Retrying creation process for batch_id %s", self.batch_id) + self.log.info("Deleting previous failed Batch") + self.hook.delete_batch( + batch_id=previous_batch_id, + region=self.region, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("Starting a new creation for batch_id %s", self.batch_id) + try: + self.operation = self.hook.create_batch( + region=self.region, + project_id=self.project_id, + batch=self.batch, + batch_id=self.batch_id, + request_id=self.request_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except AlreadyExists: + self.log.info("Batch with given id already exists.") + self.log.info("Attaching to the job %s if it is still running.", self.batch_id) + else: + batch_id = self.operation.metadata.batch.split("/")[-1] + self.log.info("The batch %s was created.", batch_id) + + self.log.info("Waiting for the completion of batch job %s", batch_id) + batch = self.hook.wait_for_batch( + batch_id=batch_id, + region=self.region, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return batch, batch_id + class DataprocDeleteBatchOperator(GoogleCloudBaseOperator): """ diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py index 2796b16d6be62..6771a529d79eb 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py @@ -70,6 +70,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID, result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) create_batch_2 = DataprocCreateBatchOperator( @@ -79,6 +80,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID_2, result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) create_batch_3 = DataprocCreateBatchOperator( @@ -89,6 +91,7 @@ batch_id=BATCH_ID_3, asynchronous=True, result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_batch_operator] @@ -123,6 +126,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID_4, asynchronous=True, + num_retries_if_resource_is_not_ready=3, ) # [START how_to_cloud_dataproc_cancel_operation_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py index afd509258d597..8f5c69ccccaf5 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py @@ -65,6 +65,7 @@ batch_id=BATCH_ID, deferrable=True, result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_batch_operator_async] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py index fbaf197c73a6e..fb592b0cdb6fe 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py @@ -92,6 +92,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server] @@ -103,6 +104,7 @@ batch=BATCH_CONFIG_WITH_PHS, batch_id=BATCH_ID, result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py index fd75917ccee6e..ef57500639dc5 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py @@ -72,6 +72,7 @@ cluster_name=CLUSTER_NAME, use_if_exists=True, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) start_cluster = DataprocStartClusterOperator( @@ -97,6 +98,7 @@ cluster_name=CLUSTER_NAME, use_if_exists=True, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) delete_cluster = DataprocDeleteClusterOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py index 267cd043eb9c7..3ff91d95a95be 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py @@ -98,6 +98,7 @@ cluster_name=CLUSTER_NAME, deferrable=True, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_cluster_operator_async] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py index 37a5ec65f4e1c..1d94f688996f8 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py @@ -75,6 +75,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [START how_to_cloud_dataproc_diagnose_cluster] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py index 31370e4693864..57c53c28e6830 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py @@ -106,6 +106,7 @@ region=REGION, cluster_config=CLUSTER_GENERATOR_CONFIG, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_cluster_generate_cluster_config_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py index a08f23cc74bc2..7759ddd098f0e 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py @@ -71,6 +71,7 @@ cluster_name=CLUSTER_NAME, use_if_exists=True, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [START how_to_cloud_dataproc_start_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py index c192990753fc5..ea7725209f808 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py @@ -86,6 +86,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [START how_to_cloud_dataproc_update_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py index 71b88325f6b86..2eb6d4c4bdf4f 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py @@ -95,6 +95,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) flink_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py index 7f0ff12f80255..becf273784ab1 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py @@ -113,6 +113,7 @@ cluster_name=CLUSTER_NAME, virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_cluster_operator_in_gke] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py index 45a668a90d45a..23ddd6eb581b7 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py @@ -94,6 +94,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) hadoop_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py index 41409dd6497b1..193d790a5282f 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py @@ -96,6 +96,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [END how_to_cloud_dataproc_create_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py index f5d20827771e1..344adaeca2e66 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py @@ -83,6 +83,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) pig_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py index 924547ddacc60..224dfc3db5b1f 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py @@ -90,6 +90,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) presto_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py index fe9d493f4ac40..44809b283767d 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py @@ -106,6 +106,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [START how_to_cloud_dataproc_submit_job_to_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py index d8a3e2d2391ec..ca76810cb43e6 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py @@ -86,6 +86,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) spark_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py index d31f6f24faf5f..2ba8cc512f73c 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py @@ -85,6 +85,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) # [START cloud_dataproc_async_submit_sensor] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py index 0efee5f60aa32..cee74bb819b9d 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py @@ -87,6 +87,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) spark_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py index cae42d25c4533..dc446ad332e12 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py @@ -83,6 +83,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) spark_sql_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py index 70df13ed2efc1..1468edf05eceb 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py @@ -104,6 +104,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) sparkr_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py index f69dbb5360915..4631733cd0b4b 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py @@ -92,6 +92,7 @@ region=REGION, cluster_name=CLUSTER_NAME, retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + num_retries_if_resource_is_not_ready=3, ) trino_task = DataprocSubmitJobOperator(