Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate bigquery operators #1457

Merged
merged 9 commits into from
Jan 29, 2024
54 changes: 35 additions & 19 deletions astronomer/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Any, Dict, List, Optional, Union, cast
from __future__ import annotations

import warnings
from typing import Any, Union, cast

from aiohttp import ClientSession as ClientSession
from airflow.exceptions import AirflowException
Expand All @@ -20,25 +23,38 @@


class BigQueryHookAsync(GoogleBaseHookAsync):
"""Big query async hook inherits from GoogleBaseHookAsync class and connects to the Google Big Query"""
"""Big query async hook inherits from GoogleBaseHookAsync class and connects to the Google Big Query

This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook` instead
"""

sync_hook_class = BigQueryHook

def __init__(self, **kwargs: Any):
warnings.warn(
(
"This module is deprecated and will be removed in 2.0.0."
"Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook`"
),
DeprecationWarning,
stacklevel=2,
)

super().__init__(**kwargs)

async def get_job_instance(
self, project_id: Optional[str], job_id: Optional[str], session: ClientSession
self, project_id: str | None, job_id: str | None, session: ClientSession
) -> Job:
"""Get the specified job resource by job ID and project ID."""
with await self.service_file_as_context() as f:
return Job(job_id=job_id, project=project_id, service_file=f, session=cast(Session, session))

async def get_job_status(
self,
job_id: Optional[str],
project_id: Optional[str] = None,
) -> Optional[str]:
job_id: str | None,
project_id: str | None = None,
) -> str | None:
"""
Polls for job status asynchronously using gcloud-aio.

Expand All @@ -61,17 +77,17 @@ async def get_job_status(

async def get_job_output(
self,
job_id: Optional[str],
project_id: Optional[str] = None,
) -> Dict[str, Any]:
job_id: str | None,
project_id: str | None = None,
) -> dict[str, Any]:
"""Get the big query job output for the given job id asynchronously using gcloud-aio."""
async with ClientSession() as session:
self.log.info("Executing get_job_output..")
job_client = await self.get_job_instance(project_id, job_id, session)
job_query_response = await job_client.get_query_results(cast(Session, session))
return job_query_response

def get_records(self, query_results: Dict[str, Any]) -> List[Any]:
def get_records(self, query_results: dict[str, Any]) -> list[Any]:
"""
Given the output query response from gcloud aio bigquery, convert the response to records.

Expand All @@ -91,8 +107,8 @@ def value_check(
self,
sql: str,
pass_value: Any,
records: List[Any],
tolerance: Optional[float] = None,
records: list[Any],
tolerance: float | None = None,
) -> None:
"""
Match a single query resulting row and tolerance with pass_value
Expand Down Expand Up @@ -125,8 +141,8 @@ def value_check(

@staticmethod
def _get_numeric_matches(
records: List[float], pass_value: Any, tolerance: Optional[float] = None
) -> List[bool]:
records: list[float], pass_value: Any, tolerance: float | None = None
) -> list[bool]:
"""
A helper function to match numeric pass_value, tolerance with records value

Expand Down Expand Up @@ -156,9 +172,9 @@ def _convert_to_float_if_possible(s: Any) -> Any:

def interval_check(
self,
row1: Optional[str],
row2: Optional[str],
metrics_thresholds: Dict[str, Any],
row1: str | None,
row2: str | None,
metrics_thresholds: dict[str, Any],
ignore_zero: bool,
ratio_formula: str,
) -> None:
Expand Down Expand Up @@ -191,8 +207,8 @@ def interval_check(

current = dict(zip(metrics_sorted, row1))
reference = dict(zip(metrics_sorted, row2))
ratios: Dict[str, Any] = {}
test_results: Dict[str, Any] = {}
ratios: dict[str, Any] = {}
test_results: dict[str, Any] = {}

for metric in metrics_sorted:
cur = float(current[metric])
Expand Down
Loading
Loading