Skip to content

Commit

Permalink
Merge pull request #476 from julep-ai/x/fix-task-execution
Browse files Browse the repository at this point in the history
fix(agents-api): Fix bug in task-execution workflow and uuid-int-list-to-str fn
  • Loading branch information
whiterabbit1983 authored Aug 29, 2024
2 parents acbf162 + 061d728 commit 612a012
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 58 deletions.
4 changes: 3 additions & 1 deletion agents-api/agents_api/common/utils/cozo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from types import SimpleNamespace
from uuid import UUID

from beartype import beartype
from pycozo import Client

# Define a mock client for testing purposes, simulating Cozo API client behavior.
Expand All @@ -20,5 +21,6 @@
)


def uuid_int_list_to_uuid4(data) -> UUID:
@beartype
def uuid_int_list_to_uuid4(data: list[int]) -> UUID:
return UUID(bytes=b"".join([i.to_bytes(1, "big") for i in data]))
1 change: 1 addition & 0 deletions agents-api/agents_api/models/execution/get_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

@rewrap_exceptions(
{
AssertionError: partialclass(HTTPException, status_code=404),
QueryException: partialclass(HTTPException, status_code=400),
ValidationError: partialclass(HTTPException, status_code=400),
TypeError: partialclass(HTTPException, status_code=400),
Expand Down
6 changes: 5 additions & 1 deletion agents-api/agents_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ def fix_uuid(

fixed = {
**item,
**{attr: uuid_int_list_to_uuid4(item[attr]) for attr in id_attrs},
**{
attr: uuid_int_list_to_uuid4(item[attr])
for attr in id_attrs
if isinstance(item[attr], list)
},
}

return fixed
Expand Down
12 changes: 1 addition & 11 deletions agents-api/agents_api/routers/tasks/get_execution_details.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from fastapi import HTTPException, status
from pydantic import UUID4

from agents_api.autogen.openapi_model import (
Expand All @@ -13,13 +12,4 @@

@router.get("/executions/{execution_id}", tags=["executions"])
async def get_execution_details(execution_id: UUID4) -> Execution:
try:
return get_execution_query(execution_id=execution_id)
except AssertionError as e:
print("-" * 100)
print(e)
print("-" * 100)

raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found"
) from e
return get_execution_query(execution_id=execution_id)
102 changes: 57 additions & 45 deletions agents-api/agents_api/workflows/task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
StepContext,
StepOutcome,
)
from ..env import testing
from ..env import debug, testing


STEP_TO_ACTIVITY = {
Expand Down Expand Up @@ -71,6 +71,26 @@
GenericStep = RootModel[WorkflowStep]


# TODO: find a way to transition to error if workflow or activity times out.


async def transition(state, context, **kwargs) -> None:
# NOTE: The state variable is closured from the outer scope
transition_request = CreateTransitionRequest(
current=context.cursor,
**{
**state.model_dump(exclude_unset=True),
**kwargs, # Override with any additional kwargs
},
)

await workflow.execute_activity(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=2),
)


@workflow.defn
class TaskExecutionWorkflow:
@workflow.run
Expand All @@ -93,7 +113,7 @@ async def run(

# ---

# 1a. Set global state
# 1. Set global state
# (By default, exit if last otherwise transition 'step' to the next step)
state = PendingTransition(
type="finish" if context.is_last_step else "step",
Expand All @@ -103,23 +123,6 @@ async def run(
metadata={"__meta__": {"step_type": step_type.__name__}},
)

# 1b. Prep a transition request
async def transition(**kwargs) -> None:
# NOTE: The state variable is closured from the outer scope
transition_request = CreateTransitionRequest(
current=context.cursor,
**{
**state.model_dump(exclude_unset=True),
**kwargs, # Override with any additional kwargs
},
)

await workflow.execute_activity(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=600),
)

# ---

# 2. Transition to starting if not done yet
Expand All @@ -142,48 +145,53 @@ async def transition(**kwargs) -> None:

# 3. Execute the current step's activity if applicable

try:
if activity := STEP_TO_ACTIVITY.get(step_type):
execute_activity = workflow.execute_activity
elif activity := STEP_TO_LOCAL_ACTIVITY.get(step_type):
execute_activity = workflow.execute_local_activity
else:
execute_activity = None
if activity := STEP_TO_ACTIVITY.get(step_type):
execute_activity = workflow.execute_activity
elif activity := STEP_TO_LOCAL_ACTIVITY.get(step_type):
execute_activity = workflow.execute_local_activity
else:
execute_activity = None

outcome = None
if execute_activity:
outcome = None

if execute_activity:
try:
outcome = await execute_activity(
activity,
context,
#
# TODO: This should be a configurable timeout everywhere based on the task
schedule_to_close_timeout=timedelta(seconds=3 if testing else 600),
schedule_to_close_timeout=timedelta(
seconds=3 if debug or testing else 600
),
)

except Exception as e:
await transition(type="error", output=dict(error=e))
raise ApplicationError(f"Activity {activity} threw error: {e}") from e
except Exception as e:
await transition(state, context, type="error", output=dict(error=e))
raise ApplicationError(f"Activity {activity} threw error: {e}") from e

# ---

# 4. Then, based on the outcome and step type, decide what to do next
match context.current_step, outcome:
# Handle errors (activity returns None)
case step, StepOutcome(error=error) if error is not None:
await transition(type="error", output=dict(error=error))
await transition(state, context, type="error", output=dict(error=error))
raise ApplicationError(
f"step {type(step).__name__} threw error: {error}"
)

case LogStep(), StepOutcome(output=output):
# Add the logged message to transition history
await transition(output=dict(logged=output))
await transition(state, context, output=dict(logged=output))

# Set the output to the current input
state.output = context.current_input

case ReturnStep(), StepOutcome(output=output):
await transition(output=output, type="finish", next=None)
await transition(
state, context, output=output, type="finish", next=None
)
return output # <--- Byeeee!

case SwitchStep(switch=switch), StepOutcome(output=index) if index >= 0:
Expand Down Expand Up @@ -359,15 +367,18 @@ async def transition(**kwargs) -> None:
case ErrorWorkflowStep(error=error), _:
state.output = dict(error=error)
state.type = "error"
await transition()
await transition(state, context)

raise ApplicationError(f"Error raised by ErrorWorkflowStep: {error}")

case YieldStep(), StepOutcome(
output=output, transition_to=(yield_transition_type, yield_next_target)
):
await transition(
output=output, type=yield_transition_type, next=yield_next_target
state,
output=output,
type=yield_transition_type,
next=yield_next_target,
)

state.output = await workflow.execute_child_workflow(
Expand All @@ -376,7 +387,7 @@ async def transition(**kwargs) -> None:
)

case WaitForInputStep(), StepOutcome(output=output):
await transition(output=output, type="wait", next=None)
await transition(state, context, output=output, type="wait", next=None)

state.type = "resume"
state.output = await execute_activity(
Expand All @@ -391,7 +402,7 @@ async def transition(**kwargs) -> None:
raise ApplicationError("Not implemented")

# 5. Create transition for completed step
await transition()
await transition(state, context)

# ---

Expand All @@ -400,9 +411,10 @@ async def transition(**kwargs) -> None:
if state.type in ("finish", "cancelled"):
return state.output

# Otherwise, recurse to the next step
# TODO: Should use a continue_as_new workflow ONLY if the next step is a conditional or loop
# Otherwise, we should just call the next step as a child workflow
workflow.continue_as_new(
args=[execution_input, state.next, previous_inputs + [state.output]]
)
else:
# Otherwise, recurse to the next step
# TODO: Should use a continue_as_new workflow ONLY if the next step is a conditional or loop
# Otherwise, we should just call the next step as a child workflow
return workflow.continue_as_new(
args=[execution_input, state.next, previous_inputs + [state.output]]
)

0 comments on commit 612a012

Please sign in to comment.