From 8c17e18014b6d04aac9ff2f95c7ad87bbfd6025b Mon Sep 17 00:00:00 2001 From: HamadaSalhab Date: Wed, 2 Oct 2024 14:29:55 +0300 Subject: [PATCH 1/8] Reformat: poe check --- .../activities/task_steps/raise_complete_async.py | 4 ++-- agents-api/agents_api/routers/tasks/update_execution.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/agents-api/agents_api/activities/task_steps/raise_complete_async.py b/agents-api/agents_api/activities/task_steps/raise_complete_async.py index a73df3f8d..54fd8a32c 100644 --- a/agents-api/agents_api/activities/task_steps/raise_complete_async.py +++ b/agents-api/agents_api/activities/task_steps/raise_complete_async.py @@ -1,4 +1,5 @@ import base64 + from temporalio import activity from ...autogen.openapi_model import CreateTransitionRequest @@ -11,10 +12,9 @@ @activity.defn async def raise_complete_async(context: StepContext, output: StepOutcome) -> None: - activity_info = activity.info() - captured_token = base64.b64encode(activity_info.task_token).decode('ascii') + captured_token = base64.b64encode(activity_info.task_token).decode("ascii") activity_id = activity_info.activity_id workflow_run_id = activity_info.workflow_run_id workflow_id = activity_info.workflow_id diff --git a/agents-api/agents_api/routers/tasks/update_execution.py b/agents-api/agents_api/routers/tasks/update_execution.py index a5ca30aab..968b6bdfb 100644 --- a/agents-api/agents_api/routers/tasks/update_execution.py +++ b/agents-api/agents_api/routers/tasks/update_execution.py @@ -47,7 +47,9 @@ async def update_execution( workflow_id = token_data["metadata"].get("x-workflow-id", None) if activity_id is None or run_id is None or workflow_id is None: act_handle = temporal_client.get_async_activity_handle( - task_token=base64.b64decode(token_data["task_token"].encode('ascii')), + task_token=base64.b64decode( + token_data["task_token"].encode("ascii") + ), ) else: @@ -59,6 +61,8 @@ async def update_execution( try: await act_handle.complete(data.input) except Exception as e: - raise HTTPException(status_code=500, detail="Failed to resume execution") + raise HTTPException( + status_code=500, detail="Failed to resume execution" + ) case _: raise HTTPException(status_code=400, detail="Invalid request data") From 940b6ac65d5f02957756f12fbae6723f0cfdc20d Mon Sep 17 00:00:00 2001 From: HamadaSalhab Date: Wed, 2 Oct 2024 14:55:13 +0300 Subject: [PATCH 2/8] feat(agents-api): Add retry policies to workflows/activities executions --- agents-api/agents_api/clients/temporal.py | 2 ++ agents-api/agents_api/common/retry_policies.py | 17 +++++++++++++++++ .../workflows/task_execution/__init__.py | 2 ++ .../workflows/task_execution/helpers.py | 3 ++- .../workflows/task_execution/transition.py | 2 ++ 5 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 agents-api/agents_api/common/retry_policies.py diff --git a/agents-api/agents_api/clients/temporal.py b/agents-api/agents_api/clients/temporal.py index 5f14b84f6..c812991f1 100644 --- a/agents-api/agents_api/clients/temporal.py +++ b/agents-api/agents_api/clients/temporal.py @@ -1,6 +1,7 @@ from datetime import timedelta from uuid import UUID +from ..common.retry_policies import DEFAULT_RETRY_POLICY from temporalio.client import Client, TLSConfig from ..autogen.openapi_model import TransitionTarget @@ -54,6 +55,7 @@ async def run_task_execution_workflow( task_queue=temporal_task_queue, id=str(job_id), run_timeout=timedelta(days=31), + retry_policy=DEFAULT_RETRY_POLICY # TODO: Should add search_attributes for queryability ) diff --git a/agents-api/agents_api/common/retry_policies.py b/agents-api/agents_api/common/retry_policies.py new file mode 100644 index 000000000..ce31201e6 --- /dev/null +++ b/agents-api/agents_api/common/retry_policies.py @@ -0,0 +1,17 @@ +from datetime import timedelta +from temporalio.common import RetryPolicy + +DEFAULT_RETRY_POLICY = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2, + maximum_attempts=2, + maximum_interval=timedelta(seconds=10), + non_retryable_error_types=[ + "WorkflowExecutionAlreadyStarted", + "TypeError", + "AssertionError", + "HTTPException", + "SyntaxError", + "ValueError", + ], +) diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index 2ca7e6ade..80863e0e0 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -4,6 +4,7 @@ from datetime import timedelta from typing import Any +from ...common.retry_policies import DEFAULT_RETRY_POLICY from pydantic import RootModel from temporalio import workflow from temporalio.exceptions import ApplicationError @@ -200,6 +201,7 @@ async def run( schedule_to_close_timeout=timedelta( seconds=30 if debug or testing else 600 ), + retry_policy=DEFAULT_RETRY_POLICY, ) workflow.logger.debug( f"Step {context.cursor.step} completed successfully" diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py index 88828b31b..cdf499783 100644 --- a/agents-api/agents_api/workflows/task_execution/helpers.py +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -4,7 +4,7 @@ from temporalio import workflow from temporalio.exceptions import ApplicationError - +from ...common.retry_policies import DEFAULT_RETRY_POLICY with workflow.unsafe.imports_passed_through(): from ...activities import task_steps from ...autogen.openapi_model import ( @@ -33,6 +33,7 @@ async def continue_as_child( previous_inputs, user_state, ], + retry_policy=DEFAULT_RETRY_POLICY ) diff --git a/agents-api/agents_api/workflows/task_execution/transition.py b/agents-api/agents_api/workflows/task_execution/transition.py index dbcd776e4..90e3bdccc 100644 --- a/agents-api/agents_api/workflows/task_execution/transition.py +++ b/agents-api/agents_api/workflows/task_execution/transition.py @@ -1,6 +1,7 @@ from datetime import timedelta from temporalio import workflow +from ...common.retry_policies import DEFAULT_RETRY_POLICY from temporalio.exceptions import ApplicationError from ...activities import task_steps @@ -44,6 +45,7 @@ async def transition( task_steps.transition_step, args=[context, transition_request], schedule_to_close_timeout=timedelta(seconds=30), + retry_policy=DEFAULT_RETRY_POLICY ) except Exception as e: From 93dfee1eb16f3ca35dc90155cf22701e12bdfac0 Mon Sep 17 00:00:00 2001 From: HamadaSalhab Date: Wed, 2 Oct 2024 17:18:48 +0300 Subject: [PATCH 3/8] Add retry policies to more activities/workflows --- agents-api/agents_api/routers/docs/create_doc.py | 2 ++ agents-api/agents_api/workflows/demo.py | 3 +++ agents-api/agents_api/workflows/embed_docs.py | 2 ++ agents-api/agents_api/workflows/mem_rating.py | 2 ++ agents-api/agents_api/workflows/summarization.py | 2 ++ agents-api/agents_api/workflows/task_execution/__init__.py | 5 +++++ agents-api/agents_api/workflows/task_execution/helpers.py | 2 ++ agents-api/agents_api/workflows/truncation.py | 2 ++ agents-api/tests/test_activities.py | 2 ++ 9 files changed, 22 insertions(+) diff --git a/agents-api/agents_api/routers/docs/create_doc.py b/agents-api/agents_api/routers/docs/create_doc.py index 0ba22c8d5..aa2ba719a 100644 --- a/agents-api/agents_api/routers/docs/create_doc.py +++ b/agents-api/agents_api/routers/docs/create_doc.py @@ -1,6 +1,7 @@ from typing import Annotated from uuid import UUID, uuid4 +from ...common.retry_policies import DEFAULT_RETRY_POLICY from fastapi import BackgroundTasks, Depends from starlette.status import HTTP_201_CREATED from temporalio.client import Client as TemporalClient @@ -41,6 +42,7 @@ async def run_embed_docs_task( embed_payload, task_queue=temporal_task_queue, id=str(job_id), + retry_policy=DEFAULT_RETRY_POLICY ) # TODO: Remove this conditional once we have a way to run workflows in diff --git a/agents-api/agents_api/workflows/demo.py b/agents-api/agents_api/workflows/demo.py index 61ad9d4a8..e5725065e 100644 --- a/agents-api/agents_api/workflows/demo.py +++ b/agents-api/agents_api/workflows/demo.py @@ -2,6 +2,8 @@ from temporalio import workflow +from ..common.retry_policies import DEFAULT_RETRY_POLICY + with workflow.unsafe.imports_passed_through(): from ..activities.demo import demo_activity @@ -14,4 +16,5 @@ async def run(self, a: int, b: int) -> int: demo_activity, args=[a, b], start_to_close_timeout=timedelta(seconds=30), + retry_policy=DEFAULT_RETRY_POLICY ) diff --git a/agents-api/agents_api/workflows/embed_docs.py b/agents-api/agents_api/workflows/embed_docs.py index 62e0e65ae..e04ebd0f5 100644 --- a/agents-api/agents_api/workflows/embed_docs.py +++ b/agents-api/agents_api/workflows/embed_docs.py @@ -8,6 +8,7 @@ with workflow.unsafe.imports_passed_through(): from ..activities.embed_docs import embed_docs from ..activities.types import EmbedDocsPayload + from ..common.retry_policies import DEFAULT_RETRY_POLICY @workflow.defn @@ -18,4 +19,5 @@ async def run(self, embed_payload: EmbedDocsPayload) -> None: embed_docs, embed_payload, schedule_to_close_timeout=timedelta(seconds=600), + retry_policy=DEFAULT_RETRY_POLICY ) diff --git a/agents-api/agents_api/workflows/mem_rating.py b/agents-api/agents_api/workflows/mem_rating.py index 4b68a7198..ffcc4bb93 100644 --- a/agents-api/agents_api/workflows/mem_rating.py +++ b/agents-api/agents_api/workflows/mem_rating.py @@ -7,6 +7,7 @@ with workflow.unsafe.imports_passed_through(): from ..activities.mem_rating import mem_rating + from ..common.retry_policies import DEFAULT_RETRY_POLICY @workflow.defn @@ -17,4 +18,5 @@ async def run(self, memory: str) -> None: mem_rating, memory, schedule_to_close_timeout=timedelta(seconds=600), + retry_policy=DEFAULT_RETRY_POLICY ) diff --git a/agents-api/agents_api/workflows/summarization.py b/agents-api/agents_api/workflows/summarization.py index 7946e9109..e117930a9 100644 --- a/agents-api/agents_api/workflows/summarization.py +++ b/agents-api/agents_api/workflows/summarization.py @@ -7,6 +7,7 @@ with workflow.unsafe.imports_passed_through(): from ..activities.summarization import summarization + from ..common.retry_policies import DEFAULT_RETRY_POLICY @workflow.defn @@ -17,4 +18,5 @@ async def run(self, session_id: str) -> None: summarization, session_id, schedule_to_close_timeout=timedelta(seconds=600), + retry_policy=DEFAULT_RETRY_POLICY ) diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index 80863e0e0..ea0797b11 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -387,6 +387,7 @@ async def run( task_steps.raise_complete_async, args=[context, output], schedule_to_close_timeout=timedelta(days=31), + retry_policy=DEFAULT_RETRY_POLICY ) state = PartialTransition(type="resume", output=result) @@ -419,6 +420,7 @@ async def run( task_steps.raise_complete_async, args=[context, tool_calls_input], schedule_to_close_timeout=timedelta(days=31), + retry_policy=DEFAULT_RETRY_POLICY ) # Feed the tool call results back to the model @@ -430,6 +432,7 @@ async def run( schedule_to_close_timeout=timedelta( seconds=30 if debug or testing else 600 ), + retry_policy=DEFAULT_RETRY_POLICY ) state = PartialTransition(output=new_response.output, type="resume") @@ -473,6 +476,7 @@ async def run( task_steps.raise_complete_async, args=[context, tool_call], schedule_to_close_timeout=timedelta(days=31), + retry_policy=DEFAULT_RETRY_POLICY ) state = PartialTransition(output=tool_call_response, type="resume") @@ -503,6 +507,7 @@ async def run( schedule_to_close_timeout=timedelta( seconds=30 if debug or testing else 600 ), + retry_policy=DEFAULT_RETRY_POLICY ) state = PartialTransition(output=tool_call_response) diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py index cdf499783..fb3e104d1 100644 --- a/agents-api/agents_api/workflows/task_execution/helpers.py +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -170,6 +170,7 @@ async def execute_map_reduce_step( task_steps.base_evaluate, args=[reduce, {"results": result, "_": output}], schedule_to_close_timeout=timedelta(seconds=30), + retry_policy=DEFAULT_RETRY_POLICY ) return result @@ -245,6 +246,7 @@ async def execute_map_reduce_step_parallel( extra_lambda_strs, ], schedule_to_close_timeout=timedelta(seconds=30), + retry_policy=DEFAULT_RETRY_POLICY ) except BaseException as e: diff --git a/agents-api/agents_api/workflows/truncation.py b/agents-api/agents_api/workflows/truncation.py index d3646ccbe..2a84b2c5f 100644 --- a/agents-api/agents_api/workflows/truncation.py +++ b/agents-api/agents_api/workflows/truncation.py @@ -7,6 +7,7 @@ with workflow.unsafe.imports_passed_through(): from ..activities.truncation import truncation + from ..common.retry_policies import DEFAULT_RETRY_POLICY @workflow.defn @@ -17,4 +18,5 @@ async def run(self, session_id: str, token_count_threshold: int) -> None: truncation, args=[session_id, token_count_threshold], schedule_to_close_timeout=timedelta(seconds=600), + retry_policy=DEFAULT_RETRY_POLICY ) diff --git a/agents-api/tests/test_activities.py b/agents-api/tests/test_activities.py index 98dfc97b5..987ba5b01 100644 --- a/agents-api/tests/test_activities.py +++ b/agents-api/tests/test_activities.py @@ -7,6 +7,7 @@ from agents_api.clients import temporal from agents_api.env import temporal_task_queue from agents_api.workflows.demo import DemoWorkflow +from agents_api.workflows.task_execution.helpers import DEFAULT_RETRY_POLICY from .fixtures import ( cozo_client, @@ -49,6 +50,7 @@ async def _(): args=[1, 2], id=str(uuid4()), task_queue=temporal_task_queue, + retry_policy=DEFAULT_RETRY_POLICY ) assert result == 3 From 578f4732f165e30c6ddd744b32f3181fa319c688 Mon Sep 17 00:00:00 2001 From: HamadaSalhab Date: Wed, 2 Oct 2024 17:19:59 +0300 Subject: [PATCH 4/8] run poe check --- agents-api/agents_api/clients/temporal.py | 4 +- .../agents_api/common/retry_policies.py | 1 + .../agents_api/routers/docs/create_doc.py | 4 +- agents-api/agents_api/workflows/demo.py | 2 +- agents-api/agents_api/workflows/embed_docs.py | 2 +- agents-api/agents_api/workflows/mem_rating.py | 2 +- .../agents_api/workflows/summarization.py | 2 +- .../workflows/task_execution/__init__.py | 13 +-- .../workflows/task_execution/helpers.py | 8 +- .../workflows/task_execution/transition.py | 4 +- agents-api/agents_api/workflows/truncation.py | 2 +- agents-api/notebooks/03-summarise.ipynb | 4 +- agents-api/notebooks/RecSum-experiments.ipynb | 84 +++++++++++-------- agents-api/tests/test_activities.py | 2 +- 14 files changed, 76 insertions(+), 58 deletions(-) diff --git a/agents-api/agents_api/clients/temporal.py b/agents-api/agents_api/clients/temporal.py index c812991f1..4bb25cbc9 100644 --- a/agents-api/agents_api/clients/temporal.py +++ b/agents-api/agents_api/clients/temporal.py @@ -1,11 +1,11 @@ from datetime import timedelta from uuid import UUID -from ..common.retry_policies import DEFAULT_RETRY_POLICY from temporalio.client import Client, TLSConfig from ..autogen.openapi_model import TransitionTarget from ..common.protocol.tasks import ExecutionInput +from ..common.retry_policies import DEFAULT_RETRY_POLICY from ..env import ( temporal_client_cert, temporal_namespace, @@ -55,7 +55,7 @@ async def run_task_execution_workflow( task_queue=temporal_task_queue, id=str(job_id), run_timeout=timedelta(days=31), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, # TODO: Should add search_attributes for queryability ) diff --git a/agents-api/agents_api/common/retry_policies.py b/agents-api/agents_api/common/retry_policies.py index ce31201e6..3bd51e69c 100644 --- a/agents-api/agents_api/common/retry_policies.py +++ b/agents-api/agents_api/common/retry_policies.py @@ -1,4 +1,5 @@ from datetime import timedelta + from temporalio.common import RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicy( diff --git a/agents-api/agents_api/routers/docs/create_doc.py b/agents-api/agents_api/routers/docs/create_doc.py index aa2ba719a..16754dbe3 100644 --- a/agents-api/agents_api/routers/docs/create_doc.py +++ b/agents-api/agents_api/routers/docs/create_doc.py @@ -1,7 +1,6 @@ from typing import Annotated from uuid import UUID, uuid4 -from ...common.retry_policies import DEFAULT_RETRY_POLICY from fastapi import BackgroundTasks, Depends from starlette.status import HTTP_201_CREATED from temporalio.client import Client as TemporalClient @@ -9,6 +8,7 @@ from ...activities.types import EmbedDocsPayload from ...autogen.openapi_model import CreateDocRequest, ResourceCreatedResponse from ...clients import temporal +from ...common.retry_policies import DEFAULT_RETRY_POLICY from ...dependencies.developer_id import get_developer_id from ...env import temporal_task_queue, testing from ...models.docs.create_doc import create_doc as create_doc_query @@ -42,7 +42,7 @@ async def run_embed_docs_task( embed_payload, task_queue=temporal_task_queue, id=str(job_id), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) # TODO: Remove this conditional once we have a way to run workflows in diff --git a/agents-api/agents_api/workflows/demo.py b/agents-api/agents_api/workflows/demo.py index e5725065e..0599a4392 100644 --- a/agents-api/agents_api/workflows/demo.py +++ b/agents-api/agents_api/workflows/demo.py @@ -16,5 +16,5 @@ async def run(self, a: int, b: int) -> int: demo_activity, args=[a, b], start_to_close_timeout=timedelta(seconds=30), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) diff --git a/agents-api/agents_api/workflows/embed_docs.py b/agents-api/agents_api/workflows/embed_docs.py index e04ebd0f5..83eefe907 100644 --- a/agents-api/agents_api/workflows/embed_docs.py +++ b/agents-api/agents_api/workflows/embed_docs.py @@ -19,5 +19,5 @@ async def run(self, embed_payload: EmbedDocsPayload) -> None: embed_docs, embed_payload, schedule_to_close_timeout=timedelta(seconds=600), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) diff --git a/agents-api/agents_api/workflows/mem_rating.py b/agents-api/agents_api/workflows/mem_rating.py index ffcc4bb93..0a87fd787 100644 --- a/agents-api/agents_api/workflows/mem_rating.py +++ b/agents-api/agents_api/workflows/mem_rating.py @@ -18,5 +18,5 @@ async def run(self, memory: str) -> None: mem_rating, memory, schedule_to_close_timeout=timedelta(seconds=600), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) diff --git a/agents-api/agents_api/workflows/summarization.py b/agents-api/agents_api/workflows/summarization.py index e117930a9..96ce4c460 100644 --- a/agents-api/agents_api/workflows/summarization.py +++ b/agents-api/agents_api/workflows/summarization.py @@ -18,5 +18,5 @@ async def run(self, session_id: str) -> None: summarization, session_id, schedule_to_close_timeout=timedelta(seconds=600), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index ea0797b11..9b99f82e2 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -4,11 +4,12 @@ from datetime import timedelta from typing import Any -from ...common.retry_policies import DEFAULT_RETRY_POLICY from pydantic import RootModel from temporalio import workflow from temporalio.exceptions import ApplicationError +from ...common.retry_policies import DEFAULT_RETRY_POLICY + # Import necessary modules and types with workflow.unsafe.imports_passed_through(): from ...activities import task_steps @@ -387,7 +388,7 @@ async def run( task_steps.raise_complete_async, args=[context, output], schedule_to_close_timeout=timedelta(days=31), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) state = PartialTransition(type="resume", output=result) @@ -420,7 +421,7 @@ async def run( task_steps.raise_complete_async, args=[context, tool_calls_input], schedule_to_close_timeout=timedelta(days=31), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) # Feed the tool call results back to the model @@ -432,7 +433,7 @@ async def run( schedule_to_close_timeout=timedelta( seconds=30 if debug or testing else 600 ), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) state = PartialTransition(output=new_response.output, type="resume") @@ -476,7 +477,7 @@ async def run( task_steps.raise_complete_async, args=[context, tool_call], schedule_to_close_timeout=timedelta(days=31), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) state = PartialTransition(output=tool_call_response, type="resume") @@ -507,7 +508,7 @@ async def run( schedule_to_close_timeout=timedelta( seconds=30 if debug or testing else 600 ), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) state = PartialTransition(output=tool_call_response) diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py index fb3e104d1..04449db58 100644 --- a/agents-api/agents_api/workflows/task_execution/helpers.py +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -4,7 +4,9 @@ from temporalio import workflow from temporalio.exceptions import ApplicationError + from ...common.retry_policies import DEFAULT_RETRY_POLICY + with workflow.unsafe.imports_passed_through(): from ...activities import task_steps from ...autogen.openapi_model import ( @@ -33,7 +35,7 @@ async def continue_as_child( previous_inputs, user_state, ], - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) @@ -170,7 +172,7 @@ async def execute_map_reduce_step( task_steps.base_evaluate, args=[reduce, {"results": result, "_": output}], schedule_to_close_timeout=timedelta(seconds=30), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) return result @@ -246,7 +248,7 @@ async def execute_map_reduce_step_parallel( extra_lambda_strs, ], schedule_to_close_timeout=timedelta(seconds=30), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) except BaseException as e: diff --git a/agents-api/agents_api/workflows/task_execution/transition.py b/agents-api/agents_api/workflows/task_execution/transition.py index 90e3bdccc..035322dad 100644 --- a/agents-api/agents_api/workflows/task_execution/transition.py +++ b/agents-api/agents_api/workflows/task_execution/transition.py @@ -1,7 +1,6 @@ from datetime import timedelta from temporalio import workflow -from ...common.retry_policies import DEFAULT_RETRY_POLICY from temporalio.exceptions import ApplicationError from ...activities import task_steps @@ -11,6 +10,7 @@ TransitionTarget, ) from ...common.protocol.tasks import PartialTransition, StepContext +from ...common.retry_policies import DEFAULT_RETRY_POLICY async def transition( @@ -45,7 +45,7 @@ async def transition( task_steps.transition_step, args=[context, transition_request], schedule_to_close_timeout=timedelta(seconds=30), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) except Exception as e: diff --git a/agents-api/agents_api/workflows/truncation.py b/agents-api/agents_api/workflows/truncation.py index 2a84b2c5f..d12a186b9 100644 --- a/agents-api/agents_api/workflows/truncation.py +++ b/agents-api/agents_api/workflows/truncation.py @@ -18,5 +18,5 @@ async def run(self, session_id: str, token_count_threshold: int) -> None: truncation, args=[session_id, token_count_threshold], schedule_to_close_timeout=timedelta(seconds=600), - retry_policy=DEFAULT_RETRY_POLICY + retry_policy=DEFAULT_RETRY_POLICY, ) diff --git a/agents-api/notebooks/03-summarise.ipynb b/agents-api/notebooks/03-summarise.ipynb index a934fd1b9..98e6f5e0a 100644 --- a/agents-api/notebooks/03-summarise.ipynb +++ b/agents-api/notebooks/03-summarise.ipynb @@ -766,7 +766,9 @@ " messages.append(user(start_message))\n", "\n", " print(\"Starting chatml generation\")\n", - " trim_result = generate(messages, model=\"gpt-4-turbo\", temperature=0.1, stop=[\" Date: Wed, 2 Oct 2024 22:41:49 +0300 Subject: [PATCH 5/8] Add more nont-retriable errors --- .../agents_api/common/retry_policies.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/agents-api/agents_api/common/retry_policies.py b/agents-api/agents_api/common/retry_policies.py index 3bd51e69c..9218168c1 100644 --- a/agents-api/agents_api/common/retry_policies.py +++ b/agents-api/agents_api/common/retry_policies.py @@ -1,12 +1,12 @@ from datetime import timedelta from temporalio.common import RetryPolicy - +from ..env import debug, testing DEFAULT_RETRY_POLICY = RetryPolicy( initial_interval=timedelta(seconds=1), backoff_coefficient=2, - maximum_attempts=2, - maximum_interval=timedelta(seconds=10), + maximum_attempts=2 if debug or testing else 25, + maximum_interval=timedelta(seconds=10) if debug or testing else timedelta(seconds=300), non_retryable_error_types=[ "WorkflowExecutionAlreadyStarted", "TypeError", @@ -14,5 +14,20 @@ "HTTPException", "SyntaxError", "ValueError", + "ZeroDivisionError", + "jinja2.exceptions.TemplateSyntaxError", + "jinja2.exceptions.TemplateNotFound", + "jsonschema.exceptions.ValidationError", + "pydantic.ValidationError", + "asyncio.CancelledError", + "asyncio.InvalidStateError", + "requests.exceptions.InvalidURL", + "requests.exceptions.MissingSchema", + "temporalio.exceptions.TerminalFailure", + "temporalio.exceptions.CanceledError", + "fastapi.exceptions.HTTPException", + "fastapi.exceptions.RequestValidationError", + "httpx.RequestError", + "httpx.HTTPStatusError", ], ) From ccd01db3eaa0867db894070d3957443b5cde2c4b Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 2 Oct 2024 18:46:01 -0400 Subject: [PATCH 6/8] feat(agents-api): Add more common exceptions to non-retryable set Signed-off-by: Diwank Singh Tomer --- .../agents_api/common/retry_policies.py | 53 +++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/agents-api/agents_api/common/retry_policies.py b/agents-api/agents_api/common/retry_policies.py index 9218168c1..214a01c90 100644 --- a/agents-api/agents_api/common/retry_policies.py +++ b/agents-api/agents_api/common/retry_policies.py @@ -2,32 +2,65 @@ from temporalio.common import RetryPolicy from ..env import debug, testing + DEFAULT_RETRY_POLICY = RetryPolicy( initial_interval=timedelta(seconds=1), backoff_coefficient=2, maximum_attempts=2 if debug or testing else 25, - maximum_interval=timedelta(seconds=10) if debug or testing else timedelta(seconds=300), + maximum_interval=timedelta(seconds=10) + if debug or testing + else timedelta(seconds=300), non_retryable_error_types=[ + # Temporal-specific errors "WorkflowExecutionAlreadyStarted", + "temporalio.exceptions.TerminalFailure", + "temporalio.exceptions.CanceledError", + # + # Built-in Python exceptions "TypeError", "AssertionError", - "HTTPException", "SyntaxError", "ValueError", "ZeroDivisionError", + "IndexError", + "AttributeError", + "LookupError", + "BufferError", + "ArithmeticError", + "KeyError", + "NameError", + "NotImplementedError", + "RecursionError", + "RuntimeError", + "StopIteration", + "StopAsyncIteration", + "IndentationError", + "TabError", + # + # Unicode-related errors + "UnicodeError", + "UnicodeEncodeError", + "UnicodeDecodeError", + "UnicodeTranslateError", + # + # HTTP and API-related errors + "HTTPException", + "fastapi.exceptions.HTTPException", + "fastapi.exceptions.RequestValidationError", + "httpx.RequestError", + "httpx.HTTPStatusError", + # + # Asynchronous programming errors + "asyncio.CancelledError", + "asyncio.InvalidStateError", + "GeneratorExit", + # + # Third-party library exceptions "jinja2.exceptions.TemplateSyntaxError", "jinja2.exceptions.TemplateNotFound", "jsonschema.exceptions.ValidationError", "pydantic.ValidationError", - "asyncio.CancelledError", - "asyncio.InvalidStateError", "requests.exceptions.InvalidURL", "requests.exceptions.MissingSchema", - "temporalio.exceptions.TerminalFailure", - "temporalio.exceptions.CanceledError", - "fastapi.exceptions.HTTPException", - "fastapi.exceptions.RequestValidationError", - "httpx.RequestError", - "httpx.HTTPStatusError", ], ) From 2d945d3934c154487211cc8d50d5e8f532987e01 Mon Sep 17 00:00:00 2001 From: HamadaSalhab Date: Thu, 3 Oct 2024 03:47:28 +0300 Subject: [PATCH 7/8] Bug fix: Remove test/debug situations --- agents-api/agents_api/common/retry_policies.py | 7 ++----- agents-api/agents_api/workflows/task_execution/__init__.py | 3 ++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/agents-api/agents_api/common/retry_policies.py b/agents-api/agents_api/common/retry_policies.py index 214a01c90..fc343553c 100644 --- a/agents-api/agents_api/common/retry_policies.py +++ b/agents-api/agents_api/common/retry_policies.py @@ -1,15 +1,12 @@ from datetime import timedelta from temporalio.common import RetryPolicy -from ..env import debug, testing DEFAULT_RETRY_POLICY = RetryPolicy( initial_interval=timedelta(seconds=1), backoff_coefficient=2, - maximum_attempts=2 if debug or testing else 25, - maximum_interval=timedelta(seconds=10) - if debug or testing - else timedelta(seconds=300), + maximum_attempts=25, + maximum_interval=timedelta(seconds=300), non_retryable_error_types=[ # Temporal-specific errors "WorkflowExecutionAlreadyStarted", diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index 9b99f82e2..253696622 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -8,7 +8,6 @@ from temporalio import workflow from temporalio.exceptions import ApplicationError -from ...common.retry_policies import DEFAULT_RETRY_POLICY # Import necessary modules and types with workflow.unsafe.imports_passed_through(): @@ -55,6 +54,8 @@ execute_switch_branch, ) from .transition import transition + from ...common.retry_policies import DEFAULT_RETRY_POLICY + # Supported steps # --------------- From 2cbc451e9680fc539e4a3436945c4bb2324e402c Mon Sep 17 00:00:00 2001 From: creatorrr Date: Sat, 5 Oct 2024 01:38:58 +0000 Subject: [PATCH 8/8] refactor: Lint agents-api (CI) --- agents-api/agents_api/workflows/task_execution/__init__.py | 3 +-- agents-api/tests/test_activities.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index f88bc7258..d26a3d999 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -8,7 +8,6 @@ from temporalio import workflow from temporalio.exceptions import ApplicationError - # Import necessary modules and types with workflow.unsafe.imports_passed_through(): from ...activities import task_steps @@ -48,6 +47,7 @@ StepContext, StepOutcome, ) + from ...common.retry_policies import DEFAULT_RETRY_POLICY from ...env import debug, testing from .helpers import ( continue_as_child, @@ -58,7 +58,6 @@ execute_switch_branch, ) from .transition import transition - from ...common.retry_policies import DEFAULT_RETRY_POLICY # Supported steps diff --git a/agents-api/tests/test_activities.py b/agents-api/tests/test_activities.py index f6e2f4c76..6f65cd034 100644 --- a/agents-api/tests/test_activities.py +++ b/agents-api/tests/test_activities.py @@ -7,7 +7,6 @@ from agents_api.clients import temporal from agents_api.env import temporal_task_queue from agents_api.workflows.demo import DemoWorkflow - from agents_api.workflows.task_execution.helpers import DEFAULT_RETRY_POLICY from .fixtures import (