From 061d72801fa3c41a6602295021bae4f39437f405 Mon Sep 17 00:00:00 2001 From: Diwank Tomer Date: Thu, 29 Aug 2024 10:59:02 -0400 Subject: [PATCH] fix(agents-api): Fix bug in task-execution workflow and uuid-int-list-to-str fn Signed-off-by: Diwank Tomer --- agents-api/agents_api/common/utils/cozo.py | 4 +- .../models/execution/get_execution.py | 1 + agents-api/agents_api/models/utils.py | 6 +- .../routers/tasks/get_execution_details.py | 12 +-- .../agents_api/workflows/task_execution.py | 102 ++++++++++-------- 5 files changed, 67 insertions(+), 58 deletions(-) diff --git a/agents-api/agents_api/common/utils/cozo.py b/agents-api/agents_api/common/utils/cozo.py index a8195a1ba..f5567dc4a 100644 --- a/agents-api/agents_api/common/utils/cozo.py +++ b/agents-api/agents_api/common/utils/cozo.py @@ -5,6 +5,7 @@ from types import SimpleNamespace from uuid import UUID +from beartype import beartype from pycozo import Client # Define a mock client for testing purposes, simulating Cozo API client behavior. @@ -20,5 +21,6 @@ ) -def uuid_int_list_to_uuid4(data) -> UUID: +@beartype +def uuid_int_list_to_uuid4(data: list[int]) -> UUID: return UUID(bytes=b"".join([i.to_bytes(1, "big") for i in data])) diff --git a/agents-api/agents_api/models/execution/get_execution.py b/agents-api/agents_api/models/execution/get_execution.py index cf9df2e09..db5448dce 100644 --- a/agents-api/agents_api/models/execution/get_execution.py +++ b/agents-api/agents_api/models/execution/get_execution.py @@ -20,6 +20,7 @@ @rewrap_exceptions( { + AssertionError: partialclass(HTTPException, status_code=404), QueryException: partialclass(HTTPException, status_code=400), ValidationError: partialclass(HTTPException, status_code=400), TypeError: partialclass(HTTPException, status_code=400), diff --git a/agents-api/agents_api/models/utils.py b/agents-api/agents_api/models/utils.py index 155624a63..4613fe7c7 100644 --- a/agents-api/agents_api/models/utils.py +++ b/agents-api/agents_api/models/utils.py @@ -27,7 +27,11 @@ def fix_uuid( fixed = { **item, - **{attr: uuid_int_list_to_uuid4(item[attr]) for attr in id_attrs}, + **{ + attr: uuid_int_list_to_uuid4(item[attr]) + for attr in id_attrs + if isinstance(item[attr], list) + }, } return fixed diff --git a/agents-api/agents_api/routers/tasks/get_execution_details.py b/agents-api/agents_api/routers/tasks/get_execution_details.py index 828598420..49f6ae2d2 100644 --- a/agents-api/agents_api/routers/tasks/get_execution_details.py +++ b/agents-api/agents_api/routers/tasks/get_execution_details.py @@ -1,4 +1,3 @@ -from fastapi import HTTPException, status from pydantic import UUID4 from agents_api.autogen.openapi_model import ( @@ -13,13 +12,4 @@ @router.get("/executions/{execution_id}", tags=["executions"]) async def get_execution_details(execution_id: UUID4) -> Execution: - try: - return get_execution_query(execution_id=execution_id) - except AssertionError as e: - print("-" * 100) - print(e) - print("-" * 100) - - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found" - ) from e + return get_execution_query(execution_id=execution_id) diff --git a/agents-api/agents_api/workflows/task_execution.py b/agents-api/agents_api/workflows/task_execution.py index 72bb604fe..ff0d6f692 100644 --- a/agents-api/agents_api/workflows/task_execution.py +++ b/agents-api/agents_api/workflows/task_execution.py @@ -39,7 +39,7 @@ StepContext, StepOutcome, ) - from ..env import testing + from ..env import debug, testing STEP_TO_ACTIVITY = { @@ -71,6 +71,26 @@ GenericStep = RootModel[WorkflowStep] +# TODO: find a way to transition to error if workflow or activity times out. + + +async def transition(state, context, **kwargs) -> None: + # NOTE: The state variable is closured from the outer scope + transition_request = CreateTransitionRequest( + current=context.cursor, + **{ + **state.model_dump(exclude_unset=True), + **kwargs, # Override with any additional kwargs + }, + ) + + await workflow.execute_activity( + task_steps.transition_step, + args=[context, transition_request], + schedule_to_close_timeout=timedelta(seconds=2), + ) + + @workflow.defn class TaskExecutionWorkflow: @workflow.run @@ -93,7 +113,7 @@ async def run( # --- - # 1a. Set global state + # 1. Set global state # (By default, exit if last otherwise transition 'step' to the next step) state = PendingTransition( type="finish" if context.is_last_step else "step", @@ -103,23 +123,6 @@ async def run( metadata={"__meta__": {"step_type": step_type.__name__}}, ) - # 1b. Prep a transition request - async def transition(**kwargs) -> None: - # NOTE: The state variable is closured from the outer scope - transition_request = CreateTransitionRequest( - current=context.cursor, - **{ - **state.model_dump(exclude_unset=True), - **kwargs, # Override with any additional kwargs - }, - ) - - await workflow.execute_activity( - task_steps.transition_step, - args=[context, transition_request], - schedule_to_close_timeout=timedelta(seconds=600), - ) - # --- # 2. Transition to starting if not done yet @@ -142,27 +145,30 @@ async def transition(**kwargs) -> None: # 3. Execute the current step's activity if applicable - try: - if activity := STEP_TO_ACTIVITY.get(step_type): - execute_activity = workflow.execute_activity - elif activity := STEP_TO_LOCAL_ACTIVITY.get(step_type): - execute_activity = workflow.execute_local_activity - else: - execute_activity = None + if activity := STEP_TO_ACTIVITY.get(step_type): + execute_activity = workflow.execute_activity + elif activity := STEP_TO_LOCAL_ACTIVITY.get(step_type): + execute_activity = workflow.execute_local_activity + else: + execute_activity = None - outcome = None - if execute_activity: + outcome = None + + if execute_activity: + try: outcome = await execute_activity( activity, context, # # TODO: This should be a configurable timeout everywhere based on the task - schedule_to_close_timeout=timedelta(seconds=3 if testing else 600), + schedule_to_close_timeout=timedelta( + seconds=3 if debug or testing else 600 + ), ) - except Exception as e: - await transition(type="error", output=dict(error=e)) - raise ApplicationError(f"Activity {activity} threw error: {e}") from e + except Exception as e: + await transition(state, context, type="error", output=dict(error=e)) + raise ApplicationError(f"Activity {activity} threw error: {e}") from e # --- @@ -170,20 +176,22 @@ async def transition(**kwargs) -> None: match context.current_step, outcome: # Handle errors (activity returns None) case step, StepOutcome(error=error) if error is not None: - await transition(type="error", output=dict(error=error)) + await transition(state, context, type="error", output=dict(error=error)) raise ApplicationError( f"step {type(step).__name__} threw error: {error}" ) case LogStep(), StepOutcome(output=output): # Add the logged message to transition history - await transition(output=dict(logged=output)) + await transition(state, context, output=dict(logged=output)) # Set the output to the current input state.output = context.current_input case ReturnStep(), StepOutcome(output=output): - await transition(output=output, type="finish", next=None) + await transition( + state, context, output=output, type="finish", next=None + ) return output # <--- Byeeee! case SwitchStep(switch=switch), StepOutcome(output=index) if index >= 0: @@ -359,7 +367,7 @@ async def transition(**kwargs) -> None: case ErrorWorkflowStep(error=error), _: state.output = dict(error=error) state.type = "error" - await transition() + await transition(state, context) raise ApplicationError(f"Error raised by ErrorWorkflowStep: {error}") @@ -367,7 +375,10 @@ async def transition(**kwargs) -> None: output=output, transition_to=(yield_transition_type, yield_next_target) ): await transition( - output=output, type=yield_transition_type, next=yield_next_target + state, + output=output, + type=yield_transition_type, + next=yield_next_target, ) state.output = await workflow.execute_child_workflow( @@ -376,7 +387,7 @@ async def transition(**kwargs) -> None: ) case WaitForInputStep(), StepOutcome(output=output): - await transition(output=output, type="wait", next=None) + await transition(state, context, output=output, type="wait", next=None) state.type = "resume" state.output = await execute_activity( @@ -391,7 +402,7 @@ async def transition(**kwargs) -> None: raise ApplicationError("Not implemented") # 5. Create transition for completed step - await transition() + await transition(state, context) # --- @@ -400,9 +411,10 @@ async def transition(**kwargs) -> None: if state.type in ("finish", "cancelled"): return state.output - # Otherwise, recurse to the next step - # TODO: Should use a continue_as_new workflow ONLY if the next step is a conditional or loop - # Otherwise, we should just call the next step as a child workflow - workflow.continue_as_new( - args=[execution_input, state.next, previous_inputs + [state.output]] - ) + else: + # Otherwise, recurse to the next step + # TODO: Should use a continue_as_new workflow ONLY if the next step is a conditional or loop + # Otherwise, we should just call the next step as a child workflow + return workflow.continue_as_new( + args=[execution_input, state.next, previous_inputs + [state.output]] + )