Skip to content

Commit

Permalink
Deprecate bigquery operators (#1457)
Browse files Browse the repository at this point in the history
* feat(providers/google): deprecate BigQueryInsertJobOperator, BigQueryCheckOperatorAsync, BigQueryGetDataOperatorAsync, BigQueryIntervalCheckOperatorAsync, BigQueryValueCheckOperatorAsync
* fix(google): set default use_legacy_sql value to False for BigQueryGetDataOperatorAsync
  • Loading branch information
Lee-W authored Jan 29, 2024
1 parent 52063a4 commit 1c3ec6e
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 1,099 deletions.
54 changes: 35 additions & 19 deletions astronomer/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Any, Dict, List, Optional, Union, cast
from __future__ import annotations

import warnings
from typing import Any, Union, cast

from aiohttp import ClientSession as ClientSession
from airflow.exceptions import AirflowException
Expand All @@ -20,25 +23,38 @@


class BigQueryHookAsync(GoogleBaseHookAsync):
"""Big query async hook inherits from GoogleBaseHookAsync class and connects to the Google Big Query"""
"""Big query async hook inherits from GoogleBaseHookAsync class and connects to the Google Big Query
This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook` instead
"""

sync_hook_class = BigQueryHook

def __init__(self, **kwargs: Any):
warnings.warn(
(
"This module is deprecated and will be removed in 2.0.0."
"Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook`"
),
DeprecationWarning,
stacklevel=2,
)

super().__init__(**kwargs)

async def get_job_instance(
self, project_id: Optional[str], job_id: Optional[str], session: ClientSession
self, project_id: str | None, job_id: str | None, session: ClientSession
) -> Job:
"""Get the specified job resource by job ID and project ID."""
with await self.service_file_as_context() as f:
return Job(job_id=job_id, project=project_id, service_file=f, session=cast(Session, session))

async def get_job_status(
self,
job_id: Optional[str],
project_id: Optional[str] = None,
) -> Optional[str]:
job_id: str | None,
project_id: str | None = None,
) -> str | None:
"""
Polls for job status asynchronously using gcloud-aio.
Expand All @@ -61,17 +77,17 @@ async def get_job_status(

async def get_job_output(
self,
job_id: Optional[str],
project_id: Optional[str] = None,
) -> Dict[str, Any]:
job_id: str | None,
project_id: str | None = None,
) -> dict[str, Any]:
"""Get the big query job output for the given job id asynchronously using gcloud-aio."""
async with ClientSession() as session:
self.log.info("Executing get_job_output..")
job_client = await self.get_job_instance(project_id, job_id, session)
job_query_response = await job_client.get_query_results(cast(Session, session))
return job_query_response

def get_records(self, query_results: Dict[str, Any]) -> List[Any]:
def get_records(self, query_results: dict[str, Any]) -> list[Any]:
"""
Given the output query response from gcloud aio bigquery, convert the response to records.
Expand All @@ -91,8 +107,8 @@ def value_check(
self,
sql: str,
pass_value: Any,
records: List[Any],
tolerance: Optional[float] = None,
records: list[Any],
tolerance: float | None = None,
) -> None:
"""
Match a single query resulting row and tolerance with pass_value
Expand Down Expand Up @@ -125,8 +141,8 @@ def value_check(

@staticmethod
def _get_numeric_matches(
records: List[float], pass_value: Any, tolerance: Optional[float] = None
) -> List[bool]:
records: list[float], pass_value: Any, tolerance: float | None = None
) -> list[bool]:
"""
A helper function to match numeric pass_value, tolerance with records value
Expand Down Expand Up @@ -156,9 +172,9 @@ def _convert_to_float_if_possible(s: Any) -> Any:

def interval_check(
self,
row1: Optional[str],
row2: Optional[str],
metrics_thresholds: Dict[str, Any],
row1: str | None,
row2: str | None,
metrics_thresholds: dict[str, Any],
ignore_zero: bool,
ratio_formula: str,
) -> None:
Expand Down Expand Up @@ -191,8 +207,8 @@ def interval_check(

current = dict(zip(metrics_sorted, row1))
reference = dict(zip(metrics_sorted, row2))
ratios: Dict[str, Any] = {}
test_results: Dict[str, Any] = {}
ratios: dict[str, Any] = {}
test_results: dict[str, Any] = {}

for metric in metrics_sorted:
cur = float(current[metric])
Expand Down
Loading

0 comments on commit 1c3ec6e

Please sign in to comment.