Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agents-api): Fix Error-Retrying Mechanism #760

Merged
merged 6 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 90 additions & 36 deletions agents-api/agents_api/common/exceptions/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""
This module defines non-retryable error types and provides a function to check
if a given error is non-retryable. These are used in conjunction with custom
Temporal interceptors to prevent unnecessary retries of certain error types.
🎯 Error Handling: The Art of Knowing When to Try Again

This module is like a bouncer at an error club - it decides which errors get a
second chance and which ones are permanently banned. Some errors are just having
a bad day (like network timeouts), while others are fundamentally problematic
(like trying to divide by zero... seriously, who does that?).

Remember: To err is human, to retry divine... but only if it makes sense!
"""

import asyncio
Expand All @@ -19,21 +24,20 @@
import requests
import temporalio.exceptions

### FIXME: This should be the opposite. We should retry on only known errors

# List of error types that should not be retried
# 🚫 The "No Second Chances" Club - errors that we won't retry
# Because sometimes, no means no!
NON_RETRYABLE_ERROR_TYPES = (
# Temporal-specific errors
# Temporal-specific errors (when time itself says no)
temporalio.exceptions.WorkflowAlreadyStartedError,
temporalio.exceptions.TerminatedError,
temporalio.exceptions.CancelledError,
#
# Built-in Python exceptions
# Built-in Python exceptions (the classics that never go out of style)
TypeError,
AssertionError,
SyntaxError,
ValueError,
ZeroDivisionError,
ZeroDivisionError, # Because dividing by zero is still not cool
IndexError,
AttributeError,
LookupError,
Expand All @@ -42,42 +46,41 @@
KeyError,
NameError,
NotImplementedError,
RecursionError,
RecursionError, # When your code goes down the rabbit hole too deep
RuntimeError,
StopIteration,
StopAsyncIteration,
IndentationError,
IndentationError, # Spaces vs tabs: the eternal debate
TabError,
#
# Unicode-related errors
# Unicode-related errors (when characters misbehave)
UnicodeError,
UnicodeEncodeError,
UnicodeDecodeError,
UnicodeTranslateError,
#
# HTTP and API-related errors
fastapi.exceptions.HTTPException,
# HTTP and API-related errors (when the web says "nope")
fastapi.exceptions.RequestValidationError,
#
# Asynchronous programming errors
# Asynchronous programming errors (async/await gone wrong)
asyncio.CancelledError,
asyncio.InvalidStateError,
GeneratorExit,
#
# Third-party library exceptions
# Third-party library exceptions (when other people's code says no)
jinja2.exceptions.TemplateSyntaxError,
jinja2.exceptions.TemplateNotFound,
jsonschema.exceptions.ValidationError,
pydantic.ValidationError,
requests.exceptions.InvalidURL,
requests.exceptions.MissingSchema,
#
# Box exceptions
# Box exceptions (when your box is broken)
box.exceptions.BoxKeyError,
box.exceptions.BoxTypeError,
box.exceptions.BoxValueError,
#
# Beartype exceptions
# Beartype exceptions (when your types are unbearable)
beartype.roar.BeartypeException,
beartype.roar.BeartypeDecorException,
beartype.roar.BeartypeDecorHintException,
Expand All @@ -92,42 +95,93 @@
beartype.roar.BeartypeDecorHintParamDefaultViolation,
beartype.roar.BeartypeDoorHintViolation,
#
# LiteLLM exceptions
# LiteLLM exceptions (when AI has a bad day)
litellm.exceptions.NotFoundError,
litellm.exceptions.InvalidRequestError,
litellm.exceptions.AuthenticationError,
litellm.exceptions.ServiceUnavailableError,
litellm.exceptions.OpenAIError,
litellm.exceptions.APIError,
)

# 🔄 The "Try Again" Club - errors that deserve another shot
# Because everyone deserves a second chance... or third... or fourth...
RETRYABLE_ERROR_TYPES = (
# LiteLLM exceptions (when AI needs a coffee break)
litellm.exceptions.RateLimitError,
litellm.exceptions.APIError, # Added to retry on "APIError: OpenAIException - Connection error"
#
# HTTP/Network related errors (internet having a bad hair day)
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.PoolTimeout,
#
# Standard library errors that are typically transient (like a bad mood)
ConnectionError,
TimeoutError,
OSError, # Covers many IO-related errors that may be transient
IOError,
#
# Database/storage related (when the database needs a nap)
asyncio.TimeoutError,
)

# HTTP status codes that say "maybe try again later?"
RETRYABLE_HTTP_STATUS_CODES = (
408, # Request Timeout (server needs a coffee break)
429, # Too Many Requests (slow down, speedster!)
503, # Service Unavailable (server is having a moment)
504, # Gateway Timeout (the internet took a detour)
)


### FIXME: This should be the opposite. So `is_retryable_error` instead of `is_non_retryable_error`
def is_non_retryable_error(error: BaseException) -> bool:
def is_retryable_error(error: BaseException) -> bool:
"""
Determines if the given error is non-retryable.
The Great Error Judge: Decides if an error deserves another chance at life.

This function checks if the error is an instance of any of the error types
defined in NON_RETRYABLE_ERROR_TYPES.
Think of this function as a very understanding but firm teacher - some mistakes
get a do-over, others are learning opportunities (aka failures).

Args:
error (Exception): The error to check.
error (Exception): The error that's pleading its case

Returns:
bool: True if the error is non-retryable, False otherwise.
bool: True if the error gets another shot, False if it's game over
"""
# First, check if it's in the "permanently banned" list
if isinstance(error, NON_RETRYABLE_ERROR_TYPES):
return False

# Check if it's in the "VIP retry club"
if isinstance(error, RETRYABLE_ERROR_TYPES):
return True

# Check for specific HTTP errors (status code == 429)
# Special handling for HTTP errors (because they're special snowflakes)
if isinstance(error, fastapi.exceptions.HTTPException):
if error.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

if isinstance(error, httpx.HTTPStatusError):
if error.response.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

# If we don't know this error, we play it safe and don't retry
# (stranger danger!)
return False

# Check for specific HTTP errors that should be retried
if isinstance(error, fastapi.exceptions.HTTPException):
if error.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

if isinstance(error, httpx.HTTPStatusError):
if error.response.status_code in (
408,
429,
503,
504,
): # pytype: disable=attribute-error
return False
if error.response.status_code in RETRYABLE_HTTP_STATUS_CODES:
return True

# If we don't know about the error, we should not retry
return True
return False
78 changes: 48 additions & 30 deletions agents-api/agents_api/common/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
ReadOnlyContextError,
)

from .exceptions.tasks import is_non_retryable_error
from .exceptions.tasks import is_retryable_error


class CustomActivityInterceptor(ActivityInboundInterceptor):
Expand All @@ -36,86 +36,104 @@ class CustomActivityInterceptor(ActivityInboundInterceptor):
"""

async def execute_activity(self, input: ExecuteActivityInput):
"""
🎭 The Activity Whisperer: Handles activity execution with style and grace

This is like a safety net for your activities - catching errors and deciding
their fate with the wisdom of a fortune cookie.
"""
try:
return await super().execute_activity(input)
except (
ContinueAsNewError,
ReadOnlyContextError,
NondeterminismError,
RPCError,
CompleteAsyncError,
TemporalError,
FailureError,
ApplicationError,
ContinueAsNewError, # When you need a fresh start
ReadOnlyContextError, # When someone tries to write in a museum
NondeterminismError, # When chaos theory kicks in
RPCError, # When computers can't talk to each other
CompleteAsyncError, # When async goes wrong
TemporalError, # When time itself rebels
FailureError, # When failure is not an option, but happens anyway
ApplicationError, # When the app says "nope"
):
creatorrr marked this conversation as resolved.
Show resolved Hide resolved
raise
except BaseException as e:
if is_non_retryable_error(e):
if not is_retryable_error(e):
# If it's not retryable, we wrap it in a nice bow (ApplicationError)
# and mark it as non-retryable to prevent further attempts
raise ApplicationError(
str(e),
type=type(e).__name__,
non_retryable=True,
)
# For retryable errors, we'll let Temporal retry with backoff
# Default retry policy ensures at least 2 retries
raise


class CustomWorkflowInterceptor(WorkflowInboundInterceptor):
"""
Custom interceptor for Temporal workflows.
🎪 The Workflow Circus Ringmaster

This interceptor catches exceptions during workflow execution and
raises them as non-retryable ApplicationErrors if they are identified
as non-retryable errors.
This interceptor is like a circus ringmaster - keeping all the workflow acts
running smoothly and catching any lions (errors) that escape their cages.
"""

async def execute_workflow(self, input: ExecuteWorkflowInput):
"""
🎪 The Main Event: Workflow Execution Extravaganza!

Watch as we gracefully handle errors like a trapeze artist catching their partner!
"""
try:
return await super().execute_workflow(input)
except (
ContinueAsNewError,
ReadOnlyContextError,
NondeterminismError,
RPCError,
CompleteAsyncError,
TemporalError,
FailureError,
ApplicationError,
ContinueAsNewError, # The show must go on!
ReadOnlyContextError, # No touching, please!
NondeterminismError, # When butterflies cause hurricanes
RPCError, # Lost in translation
CompleteAsyncError, # Async said "bye" too soon
TemporalError, # Time is relative, errors are absolute
FailureError, # Task failed successfully
ApplicationError, # App.exe has stopped working
):
raise
except BaseException as e:
if is_non_retryable_error(e):
if not is_retryable_error(e):
# Pack the error in a nice box with a "do not retry" sticker
raise ApplicationError(
str(e),
type=type(e).__name__,
non_retryable=True,
)
# Let it retry - everyone deserves a second (or third) chance!
raise


class CustomInterceptor(Interceptor):
"""
Custom Interceptor that combines both activity and workflow interceptors.
🎭 The Grand Interceptor: Master of Ceremonies

This class is responsible for creating and returning the custom
interceptors for both activities and workflows.
This is like the backstage manager of a theater - making sure both the
activity actors and workflow directors have their interceptor costumes on.
"""

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""
Creates and returns a CustomActivityInterceptor.
🎬 Activity Interceptor Factory: Where the magic begins!

This method is called by Temporal to intercept activity executions.
Creating custom activity interceptors faster than a caffeinated barista
makes espresso shots.
"""
return CustomActivityInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
"""
Returns the CustomWorkflowInterceptor class.
🎪 Workflow Interceptor Class Selector

This method is called by Temporal to get the workflow interceptor class.
Like a matchmaker for workflows and their interceptors - a match made in
exception handling heaven!
"""
return CustomWorkflowInterceptor
15 changes: 15 additions & 0 deletions agents-api/agents_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ def cozo_query_dec(func: Callable[P, tuple[str | list[Any], dict]]):

from pprint import pprint

from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)

def is_resource_busy(e: Exception) -> bool:
return isinstance(e, HTTPException) and e.status_code == 429

@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception(is_resource_busy),
)
@wraps(func)
def wrapper(*args: P.args, client=None, **kwargs: P.kwargs) -> pd.DataFrame:
queries, variables = func(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion agents-api/tests/test_execution_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ async def _(
result_coroutine = handle.result()
task = asyncio.create_task(result_coroutine)
try:
await asyncio.wait_for(task, timeout=3)
await asyncio.wait_for(task, timeout=10)
except BaseException:
task.cancel()

Expand Down
Loading