From 3291120866469ac738c25f4c141b9533fad80128 Mon Sep 17 00:00:00 2001 From: Hamada Salhab Date: Fri, 11 Oct 2024 01:45:40 +0300 Subject: [PATCH] feat(agents-api): Add `wait_for_input` step to the acceptable steps inside `foreach` step (#625) > [!IMPORTANT] > Add `WaitForInputStep` to `foreach` step in agents API, updating `Tasks.py`, `steps.tsp`, and OpenAPI specs. > > - **Behavior**: > - Add `WaitForInputStep` to `ForeachDo` and `ForeachDoUpdateItem` in `Tasks.py`, allowing it as a valid step in `foreach`. > - Update `SequentialWorkflowStep` alias in `steps.tsp` to include `WaitForInputStep`. > - **OpenAPI**: > - Update `openapi-0.4.0.yaml` and `openapi-1.0.0.yaml` to include `WaitForInputStep` in `ForeachDo` and `ForeachDoUpdateItem` schemas. > > This description was created by [Ellipsis](https://www.ellipsis.dev?ref=julep-ai%2Fjulep&utm_source=github&utm_medium=referral) for ac9b7c163b009a1a95bbc2a19533a674eeed4b6a. It will automatically update as commits are pushed. --------- Signed-off-by: Diwank Singh Tomer Co-authored-by: Diwank Singh Tomer --- agents-api/agents_api/autogen/Tasks.py | 6 +- agents-api/tests/test_execution_workflow.py | 82 +++++++++++++++++-- scheduler/docker-compose.yml | 18 +++- typespec/tasks/steps.tsp | 6 +- .../@typespec/openapi3/openapi-0.4.0.yaml | 2 + .../@typespec/openapi3/openapi-1.0.0.yaml | 2 + 6 files changed, 105 insertions(+), 11 deletions(-) diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index c7067ea0c..77dcfefd6 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -191,7 +191,8 @@ class ForeachDo(BaseModel): VALIDATION: Should NOT return more than 1000 elements. """ do: ( - EvaluateStep + WaitForInputStep + | EvaluateStep | ToolCallStep | PromptStep | GetStep @@ -214,7 +215,8 @@ class ForeachDoUpdateItem(BaseModel): VALIDATION: Should NOT return more than 1000 elements. """ do: ( - EvaluateStep + WaitForInputStep + | EvaluateStep | ToolCallStep | PromptStepUpdateItem | GetStep diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index c701a2e2f..e5ef7110a 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -671,8 +671,7 @@ async def _( assert result == expected_output -# FIXME: This test is not working. It gets stuck -# @test("workflow: wait for input step start") +@test("workflow: wait for input step start") async def _( client=cozo_client, developer_id=test_developer_id, @@ -710,7 +709,12 @@ async def _( mock_run_task_execution_workflow.assert_called_once() # Let it run for a bit - await asyncio.sleep(3) + result_coroutine = handle.result() + task = asyncio.create_task(result_coroutine) + try: + await asyncio.wait_for(task, timeout=3) + except asyncio.TimeoutError: + task.cancel() # Get the history history = await handle.fetch_history() @@ -728,12 +732,78 @@ async def _( activity for activity in activities_scheduled if activity ] - future = handle.result() - await future - assert "wait_for_input_step" in activities_scheduled +@test("workflow: foreach wait for input step start") +async def _( + client=cozo_client, + developer_id=test_developer_id, + agent=test_agent, +): + data = CreateExecutionRequest(input={"test": "input"}) + + task = create_task( + developer_id=developer_id, + agent_id=agent.id, + data=CreateTaskRequest( + **{ + "name": "test task", + "description": "test task about", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [ + { + "foreach": { + "in": "'a b c'.split()", + "do": {"wait_for_input": {"info": {"hi": '"bye"'}}}, + }, + }, + ], + } + ), + client=client, + ) + + async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): + execution, handle = await start_execution( + developer_id=developer_id, + task_id=task.id, + data=data, + client=client, + ) + + assert handle is not None + assert execution.task_id == task.id + assert execution.input == data.input + mock_run_task_execution_workflow.assert_called_once() + + # Let it run for a bit + result_coroutine = handle.result() + task = asyncio.create_task(result_coroutine) + try: + await asyncio.wait_for(task, timeout=3) + except asyncio.TimeoutError: + task.cancel() + + # Get the history + history = await handle.fetch_history() + events = [MessageToDict(e) for e in history.events] + assert len(events) > 0 + + activities_scheduled = [ + event.get("activityTaskScheduledEventAttributes", {}) + .get("activityType", {}) + .get("name") + for event in events + if "ACTIVITY_TASK_SCHEDULED" in event["eventType"] + ] + activities_scheduled = [ + activity for activity in activities_scheduled if activity + ] + + assert "for_each_step" in activities_scheduled + + @test("workflow: if-else step") async def _( client=cozo_client, diff --git a/scheduler/docker-compose.yml b/scheduler/docker-compose.yml index e0f86ec9d..613cc9e67 100644 --- a/scheduler/docker-compose.yml +++ b/scheduler/docker-compose.yml @@ -53,7 +53,7 @@ services: - POSTGRES_PASSWORD=${TEMPORAL_POSTGRES_PASSWORD} healthcheck: test: [ "CMD-SHELL", "pg_isready -d ${TEMPORAL_POSTGRES_DB:-temporal} -U ${TEMPORAL_POSTGRES_USER:-temporal}" ] - + interval: 1s timeout: 5s retries: 10 @@ -63,8 +63,22 @@ services: profiles: - temporal-ui environment: + # See: https://github.com/temporalio/ui-server/blob/main/docker/config-template.yaml - TEMPORAL_ADDRESS=${TEMPORAL_ADDRESS:-temporal:7233} - - TEMPORAL_CORS_ORIGINS=${TEMPORAL_CORS_ORIGINS:-http://localhost:3000} + # Note: Not setting it enables all origins + # - TEMPORAL_CORS_ORIGINS=${TEMPORAL_CORS_ORIGINS:-http://localhost:3000} + - TEMPORAL_CODEC_ENDPOINT=http://localhost/api/temporal + - TEMPORAL_UI_ENABLED=true + - TEMPORAL_FEEDBACK_URL=https://github.com/julep-ai/julep + - TEMPORAL_NOTIFY_ON_NEW_VERSION=false + - TEMPORAL_CSRF_COOKIE_INSECURE=true + # - TEMPORAL_HIDE_LOGS=true + # - TEMPORAL_BANNER_TEXT=gagagaga + # - TEMPORAL_DISABLE_WRITE_ACTIONS=true + # - TEMPORAL_HIDE_WORKFLOW_QUERY_ERRORS=true + # - TEMPORAL_REFRESH_WORKFLOW_COUNTS_DISABLED=true + - TEMPORAL_OPEN_API_ENABLED=true + ports: - 9000:8080 # Since 8080 is already used by agents-api diff --git a/typespec/tasks/steps.tsp b/typespec/tasks/steps.tsp index 1d8c19209..4b14ef6ae 100644 --- a/typespec/tasks/steps.tsp +++ b/typespec/tasks/steps.tsp @@ -41,6 +41,10 @@ model BaseWorkflowStep { kind_: T; } +alias SequentialWorkflowStep = + | WaitForInputStep + | MappableWorkflowStep; + alias MappableWorkflowStep = | EvaluateStep | ToolCallStep @@ -208,7 +212,7 @@ model ForeachDo { in: TypedExpression>; /** The steps to run for each iteration */ - do: MappableWorkflowStep; + do: SequentialWorkflowStep; } model ForeachStep extends BaseWorkflowStep<"foreach"> { diff --git a/typespec/tsp-output/@typespec/openapi3/openapi-0.4.0.yaml b/typespec/tsp-output/@typespec/openapi3/openapi-0.4.0.yaml index d00a5b9d2..61d89edaa 100644 --- a/typespec/tsp-output/@typespec/openapi3/openapi-0.4.0.yaml +++ b/typespec/tsp-output/@typespec/openapi3/openapi-0.4.0.yaml @@ -4167,6 +4167,7 @@ components: VALIDATION: Should NOT return more than 1000 elements. do: anyOf: + - $ref: '#/components/schemas/Tasks.WaitForInputStep' - $ref: '#/components/schemas/Tasks.EvaluateStep' - $ref: '#/components/schemas/Tasks.ToolCallStep' - $ref: '#/components/schemas/Tasks.PromptStep' @@ -4189,6 +4190,7 @@ components: VALIDATION: Should NOT return more than 1000 elements. do: anyOf: + - $ref: '#/components/schemas/Tasks.WaitForInputStep' - $ref: '#/components/schemas/Tasks.EvaluateStep' - $ref: '#/components/schemas/Tasks.ToolCallStep' - $ref: '#/components/schemas/Tasks.PromptStepUpdateItem' diff --git a/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml b/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml index b3f3919a9..36fbc0147 100644 --- a/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml +++ b/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml @@ -4167,6 +4167,7 @@ components: VALIDATION: Should NOT return more than 1000 elements. do: anyOf: + - $ref: '#/components/schemas/Tasks.WaitForInputStep' - $ref: '#/components/schemas/Tasks.EvaluateStep' - $ref: '#/components/schemas/Tasks.ToolCallStep' - $ref: '#/components/schemas/Tasks.PromptStep' @@ -4189,6 +4190,7 @@ components: VALIDATION: Should NOT return more than 1000 elements. do: anyOf: + - $ref: '#/components/schemas/Tasks.WaitForInputStep' - $ref: '#/components/schemas/Tasks.EvaluateStep' - $ref: '#/components/schemas/Tasks.ToolCallStep' - $ref: '#/components/schemas/Tasks.PromptStepUpdateItem'