-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support retryable exceptions during query execution #368
base: main
Are you sure you want to change the base?
Changes from all commits
cf28b9f
1eeefc3
128bb28
3303d5e
a77f5f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
kind: Under the Hood | ||
body: Add retry logic for retryable exceptions. | ||
time: 2024-12-04T18:59:12.48816-08:00 | ||
custom: | ||
Author: 'colin-rogers-dbt ' | ||
Issue: "368" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,26 @@ | ||
import abc | ||
import time | ||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING | ||
from typing import ( | ||
Any, | ||
Dict, | ||
Iterable, | ||
Iterator, | ||
List, | ||
Optional, | ||
Tuple, | ||
TYPE_CHECKING, | ||
Callable, | ||
Type, | ||
Union, | ||
) | ||
|
||
from dbt_common.events.contextvars import get_node_info | ||
from dbt_common.events.functions import fire_event | ||
from dbt_common.exceptions import DbtInternalError, NotImplementedError | ||
from dbt_common.exceptions import DbtInternalError, NotImplementedError, DbtRuntimeError | ||
from dbt_common.utils import cast_to_str | ||
|
||
from dbt.adapters.base import BaseConnectionManager | ||
from dbt.adapters.base.connections import SleepTime | ||
from dbt.adapters.contracts.connection import ( | ||
AdapterResponse, | ||
Connection, | ||
|
@@ -18,6 +31,7 @@ | |
SQLCommit, | ||
SQLQuery, | ||
SQLQueryStatus, | ||
AdapterEventDebug, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -61,6 +75,9 @@ def add_query( | |
auto_begin: bool = True, | ||
bindings: Optional[Any] = None, | ||
abridge_sql_log: bool = False, | ||
retryable_exceptions: Iterable[Type[Exception]] = [], | ||
retry_limit: int = 1, | ||
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, | ||
) -> Tuple[Connection, Any]: | ||
connection = self.get_thread_connection() | ||
if auto_begin and connection.transaction_open is False: | ||
|
@@ -90,7 +107,14 @@ def add_query( | |
pre = time.perf_counter() | ||
|
||
cursor = connection.handle.cursor() | ||
cursor.execute(sql, bindings) | ||
self._retryable_cursor_execute( | ||
execute_fn=cursor.execute, | ||
sql=sql, | ||
bindings=bindings, | ||
retryable_exceptions=retryable_exceptions, | ||
retry_limit=retry_limit, | ||
retry_timeout=retry_timeout, | ||
) | ||
|
||
result = self.get_response(cursor) | ||
|
||
|
@@ -199,3 +223,45 @@ def commit(self): | |
connection.transaction_open = False | ||
|
||
return connection | ||
|
||
def _retryable_cursor_execute( | ||
self, | ||
execute_fn: Callable, | ||
sql: str, | ||
bindings: Optional[Any] = None, | ||
retryable_exceptions: Iterable[Type[Exception]] = [], | ||
retry_limit: int = 1, | ||
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, | ||
_attempts: int = 0, | ||
) -> None: | ||
timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout | ||
if timeout < 0: | ||
raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.") | ||
|
||
try: | ||
execute_fn(sql, bindings) | ||
|
||
except tuple(retryable_exceptions) as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also feels like early abstraction. Can we update the type to be |
||
retry_limit -= 1 | ||
if retry_limit <= 0: | ||
raise e | ||
fire_event( | ||
AdapterEventDebug( | ||
message=f"Got a retryable error {type(e)} when attempting to execute a query.\n" | ||
f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" | ||
f"Error:\n{e}" | ||
) | ||
) | ||
|
||
time.sleep(timeout) | ||
return self._retryable_cursor_execute( | ||
execute_fn=execute_fn, | ||
sql=sql, | ||
retry_limit=retry_limit - 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already decremented the retry limit on line 234. This does it twice. |
||
retry_timeout=retry_timeout, | ||
retryable_exceptions=retryable_exceptions, | ||
_attempts=_attempts + 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see why we would pass in |
||
) | ||
|
||
except Exception as e: | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like java style programming, aka "look before you leap". If we remove the callable variant of the timeout, can we omit this as well? In that scenario, I think it's either our default, or the user setting, both of which can be reasonably assumed to be non-negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does removing the callable variant of the timeout mean here? We're passing in a function that the downstream user implements and I personally jive with that