Skip to content

Commit

Permalink
feat(agents-api): Add retry policies to workflows/activities executions
Browse files Browse the repository at this point in the history
  • Loading branch information
HamadaSalhab committed Oct 2, 2024
1 parent 8c17e18 commit 940b6ac
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 1 deletion.
2 changes: 2 additions & 0 deletions agents-api/agents_api/clients/temporal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta
from uuid import UUID

from ..common.retry_policies import DEFAULT_RETRY_POLICY
from temporalio.client import Client, TLSConfig

from ..autogen.openapi_model import TransitionTarget
Expand Down Expand Up @@ -54,6 +55,7 @@ async def run_task_execution_workflow(
task_queue=temporal_task_queue,
id=str(job_id),
run_timeout=timedelta(days=31),
retry_policy=DEFAULT_RETRY_POLICY
# TODO: Should add search_attributes for queryability
)

Expand Down
17 changes: 17 additions & 0 deletions agents-api/agents_api/common/retry_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta
from temporalio.common import RetryPolicy

DEFAULT_RETRY_POLICY = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2,
maximum_attempts=2,
maximum_interval=timedelta(seconds=10),
non_retryable_error_types=[
"WorkflowExecutionAlreadyStarted",
"TypeError",
"AssertionError",
"HTTPException",
"SyntaxError",
"ValueError",
],
)
2 changes: 2 additions & 0 deletions agents-api/agents_api/workflows/task_execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import timedelta
from typing import Any

from ...common.retry_policies import DEFAULT_RETRY_POLICY
from pydantic import RootModel
from temporalio import workflow
from temporalio.exceptions import ApplicationError
Expand Down Expand Up @@ -200,6 +201,7 @@ async def run(
schedule_to_close_timeout=timedelta(
seconds=30 if debug or testing else 600
),
retry_policy=DEFAULT_RETRY_POLICY,
)
workflow.logger.debug(
f"Step {context.cursor.step} completed successfully"
Expand Down
3 changes: 2 additions & 1 deletion agents-api/agents_api/workflows/task_execution/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from temporalio import workflow
from temporalio.exceptions import ApplicationError

from ...common.retry_policies import DEFAULT_RETRY_POLICY
with workflow.unsafe.imports_passed_through():
from ...activities import task_steps
from ...autogen.openapi_model import (
Expand Down Expand Up @@ -33,6 +33,7 @@ async def continue_as_child(
previous_inputs,
user_state,
],
retry_policy=DEFAULT_RETRY_POLICY
)


Expand Down
2 changes: 2 additions & 0 deletions agents-api/agents_api/workflows/task_execution/transition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta

from temporalio import workflow
from ...common.retry_policies import DEFAULT_RETRY_POLICY
from temporalio.exceptions import ApplicationError

from ...activities import task_steps
Expand Down Expand Up @@ -44,6 +45,7 @@ async def transition(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=30),
retry_policy=DEFAULT_RETRY_POLICY
)

except Exception as e:
Expand Down

0 comments on commit 940b6ac

Please sign in to comment.