Skip to content

Commit

Permalink
fix(agents-api): Fix Error-Retrying Mechanism (#760)
Browse files Browse the repository at this point in the history
<!-- ELLIPSIS_HIDDEN -->



> [!IMPORTANT]
> Fix error-retrying mechanism by updating retryable error logic and
interceptors, and adding retry to utility function.
> 
>   - **Error Handling**:
> - Change logic to retry on known errors in `tasks.py` by introducing
`RETRYABLE_ERROR_TYPES` and `RETRYABLE_HTTP_STATUS_CODES`.
> - Update `is_retryable_error()` to return `True` for retryable errors
and specific HTTP status codes.
>   - **Interceptors**:
> - Modify `CustomActivityInterceptor` and `CustomWorkflowInterceptor`
in `interceptors.py` to use `is_retryable_error()`.
>   - **Utilities**:
> - Add retry mechanism to `cozo_query_dec()` in `utils.py` using
`tenacity` to handle `HTTPException` with status code 429.
> 
> <sup>This description was created by </sup>[<img alt="Ellipsis"
src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=julep-ai%2Fjulep&utm_source=github&utm_medium=referral)<sup>
for 2943e26. It will automatically
update as commits are pushed.</sup>

<!-- ELLIPSIS_HIDDEN -->

---------

Co-authored-by: HamadaSalhab <[email protected]>
Co-authored-by: Diwank Singh Tomer <[email protected]>
Co-authored-by: sweep-ai[bot] <128439645+sweep-ai[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 29, 2024
1 parent 2096347 commit f9a7ed5
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 67 deletions.
126 changes: 90 additions & 36 deletions agents-api/agents_api/common/exceptions/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""
This module defines non-retryable error types and provides a function to check
if a given error is non-retryable. These are used in conjunction with custom
Temporal interceptors to prevent unnecessary retries of certain error types.
🎯 Error Handling: The Art of Knowing When to Try Again
This module is like a bouncer at an error club - it decides which errors get a
second chance and which ones are permanently banned. Some errors are just having
a bad day (like network timeouts), while others are fundamentally problematic
(like trying to divide by zero... seriously, who does that?).
Remember: To err is human, to retry divine... but only if it makes sense!
"""

import asyncio
Expand All @@ -19,21 +24,20 @@
import requests
import temporalio.exceptions

### FIXME: This should be the opposite. We should retry on only known errors

# List of error types that should not be retried
# 🚫 The "No Second Chances" Club - errors that we won't retry
# Because sometimes, no means no!
NON_RETRYABLE_ERROR_TYPES = (
# Temporal-specific errors
# Temporal-specific errors (when time itself says no)
temporalio.exceptions.WorkflowAlreadyStartedError,
temporalio.exceptions.TerminatedError,
temporalio.exceptions.CancelledError,
#
# Built-in Python exceptions
# Built-in Python exceptions (the classics that never go out of style)
TypeError,
AssertionError,
SyntaxError,
ValueError,
ZeroDivisionError,
ZeroDivisionError, # Because dividing by zero is still not cool
IndexError,
AttributeError,
LookupError,
Expand All @@ -42,42 +46,41 @@
KeyError,
NameError,
NotImplementedError,
RecursionError,
RecursionError, # When your code goes down the rabbit hole too deep
RuntimeError,
StopIteration,
StopAsyncIteration,
IndentationError,
IndentationError, # Spaces vs tabs: the eternal debate
TabError,
#
# Unicode-related errors
# Unicode-related errors (when characters misbehave)
UnicodeError,
UnicodeEncodeError,
UnicodeDecodeError,
UnicodeTranslateError,
#
# HTTP and API-related errors
fastapi.exceptions.HTTPException,
# HTTP and API-related errors (when the web says "nope")
fastapi.exceptions.RequestValidationError,
#
# Asynchronous programming errors
# Asynchronous programming errors (async/await gone wrong)
asyncio.CancelledError,
asyncio.InvalidStateError,
GeneratorExit,
#
# Third-party library exceptions
# Third-party library exceptions (when other people's code says no)
jinja2.exceptions.TemplateSyntaxError,
jinja2.exceptions.TemplateNotFound,
jsonschema.exceptions.ValidationError,
pydantic.ValidationError,
requests.exceptions.InvalidURL,
requests.exceptions.MissingSchema,
#
# Box exceptions
# Box exceptions (when your box is broken)
box.exceptions.BoxKeyError,
box.exceptions.BoxTypeError,
box.exceptions.BoxValueError,
#
# Beartype exceptions
# Beartype exceptions (when your types are unbearable)
beartype.roar.BeartypeException,
beartype.roar.BeartypeDecorException,
beartype.roar.BeartypeDecorHintException,
Expand All @@ -92,42 +95,93 @@
beartype.roar.BeartypeDecorHintParamDefaultViolation,
beartype.roar.BeartypeDoorHintViolation,
#
# LiteLLM exceptions
# LiteLLM exceptions (when AI has a bad day)
litellm.exceptions.NotFoundError,
litellm.exceptions.InvalidRequestError,
litellm.exceptions.AuthenticationError,
litellm.exceptions.ServiceUnavailableError,
litellm.exceptions.OpenAIError,
litellm.exceptions.APIError,
)

# 🔄 The "Try Again" Club - errors that deserve another shot
# Because everyone deserves a second chance... or third... or fourth...
RETRYABLE_ERROR_TYPES = (
# LiteLLM exceptions (when AI needs a coffee break)
litellm.exceptions.RateLimitError,
litellm.exceptions.APIError, # Added to retry on "APIError: OpenAIException - Connection error"
#
# HTTP/Network related errors (internet having a bad hair day)
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.PoolTimeout,
#
# Standard library errors that are typically transient (like a bad mood)
ConnectionError,
TimeoutError,
OSError, # Covers many IO-related errors that may be transient
IOError,
#
# Database/storage related (when the database needs a nap)
asyncio.TimeoutError,
)

# HTTP status codes that say "maybe try again later?"
RETRYABLE_HTTP_STATUS_CODES = (
408, # Request Timeout (server needs a coffee break)
429, # Too Many Requests (slow down, speedster!)
503, # Service Unavailable (server is having a moment)
504, # Gateway Timeout (the internet took a detour)
)


### FIXME: This should be the opposite. So `is_retryable_error` instead of `is_non_retryable_error`
def is_non_retryable_error(error: BaseException) -> bool:
def is_retryable_error(error: BaseException) -> bool:
"""
Determines if the given error is non-retryable.
The Great Error Judge: Decides if an error deserves another chance at life.
This function checks if the error is an instance of any of the error types
defined in NON_RETRYABLE_ERROR_TYPES.
Think of this function as a very understanding but firm teacher - some mistakes
get a do-over, others are learning opportunities (aka failures).
Args:
error (Exception): The error to check.
error (Exception): The error that's pleading its case
Returns:
bool: True if the error is non-retryable, False otherwise.
bool: True if the error gets another shot, False if it's game over
"""
# First, check if it's in the "permanently banned" list
if isinstance(error, NON_RETRYABLE_ERROR_TYPES):
return False

# Check if it's in the "VIP retry club"
if isinstance(error, RETRYABLE_ERROR_TYPES):
return True

# Check for specific HTTP errors (status code == 429)
# Special handling for HTTP errors (because they're special snowflakes)
if isinstance(error, fastapi.exceptions.HTTPException):
if error.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

if isinstance(error, httpx.HTTPStatusError):
if error.response.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

# If we don't know this error, we play it safe and don't retry
# (stranger danger!)
return False

# Check for specific HTTP errors that should be retried
if isinstance(error, fastapi.exceptions.HTTPException):
if error.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

if isinstance(error, httpx.HTTPStatusError):
if error.response.status_code in (
408,
429,
503,
504,
): # pytype: disable=attribute-error
return False
if error.response.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

# If we don't know about the error, we should not retry
return True
return False
78 changes: 48 additions & 30 deletions agents-api/agents_api/common/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
ReadOnlyContextError,
)

from .exceptions.tasks import is_non_retryable_error
from .exceptions.tasks import is_retryable_error


class CustomActivityInterceptor(ActivityInboundInterceptor):
Expand All @@ -36,86 +36,104 @@ class CustomActivityInterceptor(ActivityInboundInterceptor):
"""

async def execute_activity(self, input: ExecuteActivityInput):
"""
🎭 The Activity Whisperer: Handles activity execution with style and grace
This is like a safety net for your activities - catching errors and deciding
their fate with the wisdom of a fortune cookie.
"""
try:
return await super().execute_activity(input)
except (
ContinueAsNewError,
ReadOnlyContextError,
NondeterminismError,
RPCError,
CompleteAsyncError,
TemporalError,
FailureError,
ApplicationError,
ContinueAsNewError, # When you need a fresh start
ReadOnlyContextError, # When someone tries to write in a museum
NondeterminismError, # When chaos theory kicks in
RPCError, # When computers can't talk to each other
CompleteAsyncError, # When async goes wrong
TemporalError, # When time itself rebels
FailureError, # When failure is not an option, but happens anyway
ApplicationError, # When the app says "nope"
):
raise
except BaseException as e:
if is_non_retryable_error(e):
if not is_retryable_error(e):
# If it's not retryable, we wrap it in a nice bow (ApplicationError)
# and mark it as non-retryable to prevent further attempts
raise ApplicationError(
str(e),
type=type(e).__name__,
non_retryable=True,
)
# For retryable errors, we'll let Temporal retry with backoff
# Default retry policy ensures at least 2 retries
raise


class CustomWorkflowInterceptor(WorkflowInboundInterceptor):
"""
Custom interceptor for Temporal workflows.
🎪 The Workflow Circus Ringmaster
This interceptor catches exceptions during workflow execution and
raises them as non-retryable ApplicationErrors if they are identified
as non-retryable errors.
This interceptor is like a circus ringmaster - keeping all the workflow acts
running smoothly and catching any lions (errors) that escape their cages.
"""

async def execute_workflow(self, input: ExecuteWorkflowInput):
"""
🎪 The Main Event: Workflow Execution Extravaganza!
Watch as we gracefully handle errors like a trapeze artist catching their partner!
"""
try:
return await super().execute_workflow(input)
except (
ContinueAsNewError,
ReadOnlyContextError,
NondeterminismError,
RPCError,
CompleteAsyncError,
TemporalError,
FailureError,
ApplicationError,
ContinueAsNewError, # The show must go on!
ReadOnlyContextError, # No touching, please!
NondeterminismError, # When butterflies cause hurricanes
RPCError, # Lost in translation
CompleteAsyncError, # Async said "bye" too soon
TemporalError, # Time is relative, errors are absolute
FailureError, # Task failed successfully
ApplicationError, # App.exe has stopped working
):
raise
except BaseException as e:
if is_non_retryable_error(e):
if not is_retryable_error(e):
# Pack the error in a nice box with a "do not retry" sticker
raise ApplicationError(
str(e),
type=type(e).__name__,
non_retryable=True,
)
# Let it retry - everyone deserves a second (or third) chance!
raise


class CustomInterceptor(Interceptor):
"""
Custom Interceptor that combines both activity and workflow interceptors.
🎭 The Grand Interceptor: Master of Ceremonies
This class is responsible for creating and returning the custom
interceptors for both activities and workflows.
This is like the backstage manager of a theater - making sure both the
activity actors and workflow directors have their interceptor costumes on.
"""

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""
Creates and returns a CustomActivityInterceptor.
🎬 Activity Interceptor Factory: Where the magic begins!
This method is called by Temporal to intercept activity executions.
Creating custom activity interceptors faster than a caffeinated barista
makes espresso shots.
"""
return CustomActivityInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
"""
Returns the CustomWorkflowInterceptor class.
🎪 Workflow Interceptor Class Selector
This method is called by Temporal to get the workflow interceptor class.
Like a matchmaker for workflows and their interceptors - a match made in
exception handling heaven!
"""
return CustomWorkflowInterceptor
15 changes: 15 additions & 0 deletions agents-api/agents_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ def cozo_query_dec(func: Callable[P, tuple[str | list[Any], dict]]):

from pprint import pprint

from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)

def is_resource_busy(e: Exception) -> bool:
return isinstance(e, HTTPException) and e.status_code == 429

@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception(is_resource_busy),
)
@wraps(func)
def wrapper(*args: P.args, client=None, **kwargs: P.kwargs) -> pd.DataFrame:
queries, variables = func(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion agents-api/tests/test_execution_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ async def _(
result_coroutine = handle.result()
task = asyncio.create_task(result_coroutine)
try:
await asyncio.wait_for(task, timeout=3)
await asyncio.wait_for(task, timeout=10)
except BaseException:
task.cancel()

Expand Down

0 comments on commit f9a7ed5

Please sign in to comment.