Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(agents-api): Replace Retry Policies with Temporal Interceptors & Add more non-retryable errors #612

Merged
merged 7 commits into from
Oct 8, 2024
107 changes: 107 additions & 0 deletions agents-api/agents_api/common/exceptions/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
This module defines non-retryable error types and provides a function to check
if a given error is non-retryable. These are used in conjunction with custom
Temporal interceptors to prevent unnecessary retries of certain error types.
"""

import asyncio

import beartype
import beartype.roar
import box
import box.exceptions
import fastapi
import httpx
import jinja2
import jsonschema.exceptions
import pydantic
import requests
import temporalio.exceptions

# List of error types that should not be retried
NON_RETRYABLE_ERROR_TYPES = [
# Temporal-specific errors
temporalio.exceptions.WorkflowAlreadyStartedError,
temporalio.exceptions.TerminatedError,
temporalio.exceptions.CancelledError,
#
# Built-in Python exceptions
TypeError,
AssertionError,
SyntaxError,
ValueError,
ZeroDivisionError,
IndexError,
AttributeError,
LookupError,
BufferError,
ArithmeticError,
KeyError,
NameError,
NotImplementedError,
RecursionError,
RuntimeError,
StopIteration,
StopAsyncIteration,
IndentationError,
TabError,
#
# Unicode-related errors
UnicodeError,
UnicodeEncodeError,
UnicodeDecodeError,
UnicodeTranslateError,
#
# HTTP and API-related errors
fastapi.exceptions.HTTPException,
fastapi.exceptions.RequestValidationError,
httpx.RequestError,
httpx.HTTPStatusError,
#
# Asynchronous programming errors
asyncio.CancelledError,
asyncio.InvalidStateError,
GeneratorExit,
#
# Third-party library exceptions
jinja2.exceptions.TemplateSyntaxError,
jinja2.exceptions.TemplateNotFound,
jsonschema.exceptions.ValidationError,
pydantic.ValidationError,
requests.exceptions.InvalidURL,
requests.exceptions.MissingSchema,
# Box exceptions
box.exceptions.BoxKeyError,
box.exceptions.BoxTypeError,
box.exceptions.BoxValueError,
# Beartype exceptions
beartype.roar.BeartypeException,
beartype.roar.BeartypeDecorException,
beartype.roar.BeartypeDecorHintException,
beartype.roar.BeartypeDecorHintNonpepException,
beartype.roar.BeartypeDecorHintPepException,
beartype.roar.BeartypeDecorHintPepUnsupportedException,
beartype.roar.BeartypeDecorHintTypeException,
beartype.roar.BeartypeDecorParamException,
beartype.roar.BeartypeDecorParamNameException,
beartype.roar.BeartypeCallHintParamViolation,
beartype.roar.BeartypeCallHintReturnViolation,
beartype.roar.BeartypeDecorHintParamDefaultViolation,
beartype.roar.BeartypeDoorHintViolation,
]


def is_non_retryable_error(error: Exception) -> bool:
"""
Determines if the given error is non-retryable.

This function checks if the error is an instance of any of the error types
defined in NON_RETRYABLE_ERROR_TYPES.

Args:
error (Exception): The error to check.

Returns:
bool: True if the error is non-retryable, False otherwise.
"""
return isinstance(error, tuple(NON_RETRYABLE_ERROR_TYPES))
92 changes: 92 additions & 0 deletions agents-api/agents_api/common/interceptors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
This module defines custom interceptors for Temporal activities and workflows.
The main purpose of these interceptors is to handle errors and prevent retrying
certain types of errors that are known to be non-retryable.
"""

from typing import Optional, Type

from temporalio.exceptions import ApplicationError
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)

from .exceptions.tasks import is_non_retryable_error


class CustomActivityInterceptor(ActivityInboundInterceptor):
"""
Custom interceptor for Temporal activities.

This interceptor catches exceptions during activity execution and
raises them as non-retryable ApplicationErrors if they are identified
as non-retryable errors.
"""

async def execute_activity(self, input: ExecuteActivityInput):
try:
return await super().execute_activity(input)
except Exception as e:
if is_non_retryable_error(e):
raise ApplicationError(
str(e),
type=type(e).__name__,
non_retryable=True,
)
raise


class CustomWorkflowInterceptor(WorkflowInboundInterceptor):
"""
Custom interceptor for Temporal workflows.

This interceptor catches exceptions during workflow execution and
raises them as non-retryable ApplicationErrors if they are identified
as non-retryable errors.
"""

async def execute_workflow(self, input: ExecuteWorkflowInput):
try:
return await super().execute_workflow(input)
except Exception as e:
if is_non_retryable_error(e):
raise ApplicationError(
str(e),
type=type(e).__name__,
non_retryable=True,
)
raise


class CustomInterceptor(Interceptor):
"""
Custom Interceptor that combines both activity and workflow interceptors.

This class is responsible for creating and returning the custom
interceptors for both activities and workflows.
"""

def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""
Creates and returns a CustomActivityInterceptor.

This method is called by Temporal to intercept activity executions.
"""
return CustomActivityInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
"""
Returns the CustomWorkflowInterceptor class.

This method is called by Temporal to get the workflow interceptor class.
"""
return CustomWorkflowInterceptor
73 changes: 12 additions & 61 deletions agents-api/agents_api/common/retry_policies.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,14 @@
from datetime import timedelta
# from datetime import timedelta

from temporalio.common import RetryPolicy
# from temporalio.common import RetryPolicy

DEFAULT_RETRY_POLICY = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2,
maximum_attempts=25,
maximum_interval=timedelta(seconds=300),
non_retryable_error_types=[
# Temporal-specific errors
"WorkflowExecutionAlreadyStarted",
"temporalio.exceptions.TerminalFailure",
"temporalio.exceptions.CanceledError",
#
# Built-in Python exceptions
"TypeError",
"AssertionError",
"SyntaxError",
"ValueError",
"ZeroDivisionError",
"IndexError",
"AttributeError",
"LookupError",
"BufferError",
"ArithmeticError",
"KeyError",
"NameError",
"NotImplementedError",
"RecursionError",
"RuntimeError",
"StopIteration",
"StopAsyncIteration",
"IndentationError",
"TabError",
#
# Unicode-related errors
"UnicodeError",
"UnicodeEncodeError",
"UnicodeDecodeError",
"UnicodeTranslateError",
#
# HTTP and API-related errors
"HTTPException",
"fastapi.exceptions.HTTPException",
"fastapi.exceptions.RequestValidationError",
"httpx.RequestError",
"httpx.HTTPStatusError",
#
# Asynchronous programming errors
"asyncio.CancelledError",
"asyncio.InvalidStateError",
"GeneratorExit",
#
# Third-party library exceptions
"jinja2.exceptions.TemplateSyntaxError",
"jinja2.exceptions.TemplateNotFound",
"jsonschema.exceptions.ValidationError",
"pydantic.ValidationError",
"requests.exceptions.InvalidURL",
"requests.exceptions.MissingSchema",
],
)
# DEFAULT_RETRY_POLICY = RetryPolicy(
# initial_interval=timedelta(seconds=1),
# backoff_coefficient=2,
# maximum_attempts=25,
# maximum_interval=timedelta(seconds=300),
# )

# FIXME: Adding both interceptors and retry policy (even with `non_retryable_errors` not set)
# is causing the errors to be retried. We need to find a workaround for this.
DEFAULT_RETRY_POLICY = None
2 changes: 2 additions & 0 deletions agents-api/agents_api/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def create_worker(client: Client) -> Any:
from ..activities.mem_rating import mem_rating
from ..activities.summarization import summarization
from ..activities.truncation import truncation
from ..common.interceptors import CustomInterceptor
from ..env import (
temporal_task_queue,
)
Expand Down Expand Up @@ -61,6 +62,7 @@ def create_worker(client: Client) -> Any:
summarization,
truncation,
],
interceptors=[CustomInterceptor()],
)

return worker
Loading