Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'retry_if_resource_not_ready' logic for DataprocCreateClusterOperator and DataprocCreateBatchOperator #118

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
from google.type.interval_pb2 import Interval


class DataprocResourceIsNotReadyError(AirflowException):
"""Raise when resource is not ready for create Dataproc cluster."""


class DataProcJobBuilder:
"""A helper class for building Dataproc job."""

Expand Down Expand Up @@ -281,6 +285,8 @@ def wait_for_operation(
return operation.result(timeout=timeout, retry=result_retry)
except Exception:
error = operation.exception(timeout=timeout)
if self.check_error_for_resource_is_not_ready_msg(error.message):
raise DataprocResourceIsNotReadyError(error.message)
raise AirflowException(error)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -1192,6 +1198,11 @@ def wait_for_batch(

return result

def check_error_for_resource_is_not_ready_msg(self, error_msg: str) -> bool:
"""Check that reason of error is resource is not ready."""
key_words = ["The resource", "is not ready"]
MaksYermak marked this conversation as resolved.
Show resolved Hide resolved
return all([word in error_msg for word in key_words])


class DataprocAsyncHook(GoogleBaseHook):
"""
Expand Down
130 changes: 115 additions & 15 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder
from airflow.providers.google.cloud.hooks.dataproc import (
DataprocHook,
DataProcJobBuilder,
DataprocResourceIsNotReadyError,
)
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.dataproc import (
DATAPROC_BATCH_LINK,
Expand Down Expand Up @@ -593,6 +597,8 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator):
:param delete_on_error: If true the cluster will be deleted if created with ERROR state. Default
value is true.
:param use_if_exists: If true use existing cluster
:param num_retries_if_resource_is_not_ready: Optional. The number of retry for cluster creation request
when resource is not ready error appears.
:param request_id: Optional. A unique id used to identify the request. If the server receives two
``DeleteClusterRequest`` requests with the same id, then the second request will be ignored and the
first ``google.longrunning.Operation`` created and stored in the backend is returned.
Expand Down Expand Up @@ -639,6 +645,7 @@ def __init__(
request_id: str | None = None,
delete_on_error: bool = True,
use_if_exists: bool = True,
num_retries_if_resource_is_not_ready: int = 0,
retry: AsyncRetry | _MethodDefault | Retry = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
Expand Down Expand Up @@ -695,6 +702,7 @@ def __init__(
self.virtual_cluster_config = virtual_cluster_config
self.deferrable = deferrable
self.polling_interval_seconds = polling_interval_seconds
self.num_retries_if_resource_is_not_ready = num_retries_if_resource_is_not_ready

def _create_cluster(self, hook: DataprocHook):
return hook.create_cluster(
Expand Down Expand Up @@ -729,20 +737,26 @@ def _handle_error_state(self, hook: DataprocHook, cluster: Cluster) -> None:
return
self.log.info("Cluster is in ERROR state")
self.log.info("Gathering diagnostic information.")
operation = hook.diagnose_cluster(
region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
)
operation.result()
gcs_uri = str(operation.operation.response.value)
self.log.info("Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri)

if self.delete_on_error:
self._delete_cluster(hook)
# The delete op is asynchronous and can cause further failure if the cluster finishes
# deleting between catching AlreadyExists and checking state
self._wait_for_cluster_in_deleting_state(hook)
raise AirflowException("Cluster was created in an ERROR state then deleted.")
raise AirflowException("Cluster was created but is in ERROR state")
try:
operation = hook.diagnose_cluster(
region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
)
operation.result()
gcs_uri = str(operation.operation.response.value)
self.log.info(
"Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
)
except Exception as diagnose_error:
self.log.info("Some error occurred when trying to diagnose cluster.")
self.log.exception(diagnose_error)
finally:
if self.delete_on_error:
self._delete_cluster(hook)
# The delete op is asynchronous and can cause further failure if the cluster finishes
# deleting between catching AlreadyExists and checking state
self._wait_for_cluster_in_deleting_state(hook)
raise AirflowException("Cluster was created in an ERROR state then deleted.")
raise AirflowException("Cluster was created but is in ERROR state")

def _wait_for_cluster_in_deleting_state(self, hook: DataprocHook) -> None:
time_left = self.timeout
Expand Down Expand Up @@ -780,6 +794,16 @@ def _start_cluster(self, hook: DataprocHook):
)
return hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=op)

def _retry_cluster_creation(self, hook: DataprocHook):
self.log.info("Retrying creation process for Cluster %s", self.cluster_name)
self._delete_cluster(hook)
self._wait_for_cluster_in_deleting_state(hook)
self.log.info("Starting a new creation for Cluster %s", self.cluster_name)
operation = self._create_cluster(hook)
cluster = hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation)
self.log.info("Cluster created.")
return Cluster.to_dict(cluster)

def execute(self, context: Context) -> dict:
self.log.info("Creating cluster: %s", self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
Expand Down Expand Up @@ -829,6 +853,25 @@ def execute(self, context: Context) -> dict:
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
except DataprocResourceIsNotReadyError as resource_not_ready_error:
if self.num_retries_if_resource_is_not_ready:
attempt = self.num_retries_if_resource_is_not_ready
while attempt > 0:
attempt -= 1
try:
cluster = self._retry_cluster_creation(hook)
except DataprocResourceIsNotReadyError:
continue
else:
return cluster
self.log.info(
"Retrying Cluster %s creation because of resource not ready was unsuccessful.",
self.cluster_name,
)
if self.delete_on_error:
self._delete_cluster(hook)
self._wait_for_cluster_in_deleting_state(hook)
raise resource_not_ready_error
except AirflowException as ae:
# There still could be a cluster created here in an ERROR state which
# should be deleted immediately rather than consuming another retry attempt
Expand Down Expand Up @@ -2948,6 +2991,8 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator):
:param request_id: Optional. A unique id used to identify the request. If the server receives two
``CreateBatchRequest`` requests with the same id, then the second request will be ignored and
the first ``google.longrunning.Operation`` created and stored in the backend is returned.
:param num_retries_if_resource_is_not_ready: Optional. The number of retry for cluster creation request
when resource is not ready error appears.
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
retried.
:param result_retry: Result retry object used to retry requests. Is used to decrease delay between
Expand Down Expand Up @@ -2988,6 +3033,7 @@ def __init__(
batch: dict | Batch,
batch_id: str | None = None,
request_id: str | None = None,
num_retries_if_resource_is_not_ready: int = 0,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
Expand All @@ -3007,6 +3053,7 @@ def __init__(
self.batch = batch
self.batch_id = batch_id
self.request_id = request_id
self.num_retries_if_resource_is_not_ready = num_retries_if_resource_is_not_ready
self.retry = retry
self.result_retry = result_retry
self.timeout = timeout
Expand Down Expand Up @@ -3091,6 +3138,15 @@ def execute(self, context: Context):
timeout=self.timeout,
metadata=self.metadata,
)
if self.num_retries_if_resource_is_not_ready and self.hook.check_error_for_resource_is_not_ready_msg(
batch.state_message
):
attempt = self.num_retries_if_resource_is_not_ready
while attempt > 0:
attempt -= 1
batch, batch_id = self.retry_batch_creation(batch_id)
if not self.hook.check_error_for_resource_is_not_ready_msg(batch.state_message):
break

self.handle_batch_status(context, batch.state, batch_id, batch.state_message)
return Batch.to_dict(batch)
Expand Down Expand Up @@ -3133,6 +3189,50 @@ def handle_batch_status(
raise AirflowException(f"Batch job {batch_id} unspecified. Driver logs: {link}")
self.log.info("Batch job %s completed. Driver logs: %s", batch_id, link)

def retry_batch_creation(
self,
previous_batch_id: str,
):
self.log.info("Retrying creation process for batch_id %s", self.batch_id)
self.log.info("Deleting previous failed Batch")
self.hook.delete_batch(
batch_id=previous_batch_id,
region=self.region,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
self.log.info("Starting a new creation for batch_id %s", self.batch_id)
try:
self.operation = self.hook.create_batch(
region=self.region,
project_id=self.project_id,
batch=self.batch,
batch_id=self.batch_id,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
self.log.info("Batch with given id already exists.")
self.log.info("Attaching to the job %s if it is still running.", self.batch_id)
else:
batch_id = self.operation.metadata.batch.split("/")[-1]
self.log.info("The batch %s was created.", batch_id)

self.log.info("Waiting for the completion of batch job %s", batch_id)
batch = self.hook.wait_for_batch(
batch_id=batch_id,
region=self.region,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
return batch, batch_id


class DataprocDeleteBatchOperator(GoogleCloudBaseOperator):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

create_batch_2 = DataprocCreateBatchOperator(
Expand All @@ -79,6 +80,7 @@
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

create_batch_3 = DataprocCreateBatchOperator(
Expand All @@ -89,6 +91,7 @@
batch_id=BATCH_ID_3,
asynchronous=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_batch_operator]

Expand Down Expand Up @@ -123,6 +126,7 @@
batch=BATCH_CONFIG,
batch_id=BATCH_ID_4,
asynchronous=True,
num_retries_if_resource_is_not_ready=3,
)

# [START how_to_cloud_dataproc_cancel_operation_operator]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
batch_id=BATCH_ID,
deferrable=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_batch_operator_async]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server]

Expand All @@ -103,6 +104,7 @@
batch=BATCH_CONFIG_WITH_PHS,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
cluster_name=CLUSTER_NAME,
use_if_exists=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

start_cluster = DataprocStartClusterOperator(
Expand All @@ -97,6 +98,7 @@
cluster_name=CLUSTER_NAME,
use_if_exists=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

delete_cluster = DataprocDeleteClusterOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
cluster_name=CLUSTER_NAME,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_cluster_operator_async]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

# [START how_to_cloud_dataproc_diagnose_cluster]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
region=REGION,
cluster_config=CLUSTER_GENERATOR_CONFIG,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config_operator]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
cluster_name=CLUSTER_NAME,
use_if_exists=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

# [START how_to_cloud_dataproc_start_cluster_operator]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

# [START how_to_cloud_dataproc_update_cluster_operator]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

flink_task = DataprocSubmitJobOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
cluster_name=CLUSTER_NAME,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_cluster_operator_in_gke]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

hadoop_task = DataprocSubmitJobOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
# [END how_to_cloud_dataproc_create_cluster_operator]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)

pig_task = DataprocSubmitJobOperator(
Expand Down
Loading
Loading