Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate Dataproc Operators #1450

Merged
merged 8 commits into from
Jan 29, 2024
37 changes: 27 additions & 10 deletions astronomer/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import warnings
from typing import Any, Optional, Sequence, Tuple, Union
from typing import Any, Sequence, Union

from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
Expand All @@ -21,10 +23,25 @@


class DataprocHookAsync(GoogleBaseHook):
"""Async Hook for Google Cloud Dataproc APIs"""
"""Async Hook for Google Cloud Dataproc APIs

This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook` instead
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn(
(
"This module is deprecated and will be removed in 2.0.0."
"Please use `airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook`"
),
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)

def get_cluster_client(
self, region: Optional[str] = None, location: Optional[str] = None
self, region: str | None = None, location: str | None = None
) -> ClusterControllerAsyncClient:
"""
Get async cluster controller client for GCP Dataproc.
Expand All @@ -46,7 +63,7 @@ def get_cluster_client(
)

def get_job_client(
self, region: Optional[str] = None, location: Optional[str] = None
self, region: str | None = None, location: str | None = None
) -> JobControllerAsyncClient:
"""
Get async job controller for GCP Dataproc.
Expand All @@ -73,7 +90,7 @@ async def get_cluster(
cluster_name: str,
project_id: str,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
metadata: Sequence[tuple[str, str]] = (),
) -> clusters.Cluster:
"""
Get a cluster details from GCP using `ClusterControllerAsyncClient`
Expand All @@ -100,10 +117,10 @@ async def get_job(
job_id: str,
project_id: str,
timeout: float = 5,
region: Optional[str] = None,
location: Optional[str] = None,
region: str | None = None,
location: str | None = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
metadata: Sequence[tuple[str, str]] = (),
) -> JobType:
"""
Gets the resource representation for a job using `JobControllerAsyncClient`.
Expand All @@ -128,8 +145,8 @@ async def get_job(
return job

def _get_client_options_and_region(
self, region: Optional[str] = None, location: Optional[str] = None
) -> Tuple[ClientOptions, Optional[str]]:
self, region: str | None = None, location: str | None = None
) -> tuple[ClientOptions, str | None]:
"""
Checks for location if present or not and creates a client options using the provided region/location

Expand Down
Loading
Loading