Skip to content

Commit

Permalink
refactor: Lint agents-api (CI)
Browse files Browse the repository at this point in the history
  • Loading branch information
HamadaSalhab authored and github-actions[bot] committed Oct 8, 2024
1 parent ff5da46 commit b89c7b3
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
5 changes: 3 additions & 2 deletions agents-api/agents_api/common/exceptions/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
Temporal interceptors to prevent unnecessary retries of certain error types.
"""

import temporalio.exceptions
import asyncio

import fastapi
import httpx
import asyncio
import jinja2
import jsonschema.exceptions
import pydantic
import requests
import temporalio.exceptions

# List of error types that should not be retried
NON_RETRYABLE_ERROR_TYPES = [
Expand Down
15 changes: 8 additions & 7 deletions agents-api/agents_api/common/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
from temporalio.exceptions import ApplicationError
from temporalio.worker import (
ActivityInboundInterceptor,
WorkflowInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInterceptorClassInput
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)

from .exceptions.tasks import is_non_retryable_error


class CustomActivityInterceptor(ActivityInboundInterceptor):
"""
Custom interceptor for Temporal activities.
This interceptor catches exceptions during activity execution and
raises them as non-retryable ApplicationErrors if they are identified
as non-retryable errors.
Expand All @@ -43,7 +44,7 @@ async def execute_activity(self, input: ExecuteActivityInput):
class CustomWorkflowInterceptor(WorkflowInboundInterceptor):
"""
Custom interceptor for Temporal workflows.
This interceptor catches exceptions during workflow execution and
raises them as non-retryable ApplicationErrors if they are identified
as non-retryable errors.
Expand All @@ -65,7 +66,7 @@ async def execute_workflow(self, input: ExecuteWorkflowInput):
class CustomInterceptor(Interceptor):
"""
Custom Interceptor that combines both activity and workflow interceptors.
This class is responsible for creating and returning the custom
interceptors for both activities and workflows.
"""
Expand All @@ -75,7 +76,7 @@ def intercept_activity(
) -> ActivityInboundInterceptor:
"""
Creates and returns a CustomActivityInterceptor.
This method is called by Temporal to intercept activity executions.
"""
return CustomActivityInterceptor(super().intercept_activity(next))
Expand All @@ -85,7 +86,7 @@ def workflow_interceptor_class(
) -> Optional[Type[WorkflowInboundInterceptor]]:
"""
Returns the CustomWorkflowInterceptor class.
This method is called by Temporal to get the workflow interceptor class.
"""
return CustomWorkflowInterceptor
2 changes: 1 addition & 1 deletion agents-api/agents_api/common/retry_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@

# FIXME: Adding both interceptors and retry policy (even with `non_retryable_errors` not set)
# is causing the errors to be retried. We need to find a workaround for this.
DEFAULT_RETRY_POLICY = None
DEFAULT_RETRY_POLICY = None
4 changes: 2 additions & 2 deletions agents-api/agents_api/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def create_worker(client: Client) -> Any:
from ..activities.mem_rating import mem_rating
from ..activities.summarization import summarization
from ..activities.truncation import truncation
from ..common.interceptors import CustomInterceptor
from ..env import (
temporal_task_queue,
)
Expand All @@ -32,7 +33,6 @@ def create_worker(client: Client) -> Any:
from ..workflows.summarization import SummarizationWorkflow
from ..workflows.task_execution import TaskExecutionWorkflow
from ..workflows.truncation import TruncationWorkflow
from ..common.interceptors import CustomInterceptor

task_activity_names, task_activities = zip(*getmembers(task_steps, isfunction))

Expand Down Expand Up @@ -62,7 +62,7 @@ def create_worker(client: Client) -> Any:
summarization,
truncation,
],
interceptors=[CustomInterceptor()]
interceptors=[CustomInterceptor()],
)

return worker

0 comments on commit b89c7b3

Please sign in to comment.