From d139570622dd6db02c9aae1e0fd340dedb679832 Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 19:41:25 -0400 Subject: [PATCH] fix: Minor fix to using eval instead of ast.literal_eval Signed-off-by: Diwank Singh Tomer --- .../activities/task_steps/base_evaluate.py | 12 +- agents-api/agents_api/autogen/Tasks.py | 4 +- agents-api/tests/test_execution_workflow.py | 129 +++++++++--------- .../api/schemas/$Tasks_CreateTaskRequest.ts | 1 + .../api/schemas/$Tasks_PatchTaskRequest.ts | 1 + sdks/ts/src/api/schemas/$Tasks_Task.ts | 1 + .../api/schemas/$Tasks_UpdateTaskRequest.ts | 1 + typespec/tasks/steps.tsp | 1 + 8 files changed, 83 insertions(+), 67 deletions(-) diff --git a/agents-api/agents_api/activities/task_steps/base_evaluate.py b/agents-api/agents_api/activities/task_steps/base_evaluate.py index fb9412cd9..0263345ec 100644 --- a/agents-api/agents_api/activities/task_steps/base_evaluate.py +++ b/agents-api/agents_api/activities/task_steps/base_evaluate.py @@ -22,8 +22,18 @@ async def base_evaluate( extra_lambdas = {} if extra_lambda_strs: for k, v in extra_lambda_strs.items(): + v = v.strip() + + # Check that all extra lambdas are valid assert v.startswith("lambda "), "All extra lambdas must start with 'lambda'" - extra_lambdas[k] = ast.literal_eval(v) + + try: + ast.parse(v) + except Exception as e: + raise ValueError(f"Invalid lambda: {v}") from e + + # Eval the lambda and add it to the extra lambdas + extra_lambdas[k] = eval(v) # Turn the nested dict values from pydantic to dicts where possible values = { diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index bac5e45df..8c8a22dbb 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -350,7 +350,7 @@ class Main(BaseModel): """ The initial value of the reduce expression """ - parallelism: int | None = None + parallelism: Annotated[int | None, Field(None, ge=1)] """ Whether to run the reduce expression in parallel and how many items to run in each batch """ @@ -391,7 +391,7 @@ class MainModel(BaseModel): """ The initial value of the reduce expression """ - parallelism: int | None = None + parallelism: Annotated[int | None, Field(None, ge=1)] """ Whether to run the reduce expression in parallel and how many items to run in each batch """ diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index 52fc393ca..f4a980533 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -699,79 +699,80 @@ async def _( assert [r["res"] for r in result] == ["a", "b", "c"] -@test("workflow: map reduce step parallel (parallelism=10)") -async def _( - client=cozo_client, - developer_id=test_developer_id, - agent=test_agent, -): - data = CreateExecutionRequest(input={"test": "input"}) - - map_step = { - "over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()", - "map": { - "evaluate": {"res": "_ + '!'"}, - }, - "parallelism": 10, - } +for p in range(1, 30, 6): + @test(f"workflow: map reduce step parallel (parallelism={p})") + async def _( + client=cozo_client, + developer_id=test_developer_id, + agent=test_agent, + ): + data = CreateExecutionRequest(input={"test": "input"}) - task_def = { - "name": "test task", - "description": "test task about", - "input_schema": {"type": "object", "additionalProperties": True}, - "main": [map_step], - } + map_step = { + "over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()", + "map": { + "evaluate": {"res": "_ + '!'"}, + }, + "parallelism": p, + } - task = create_task( - developer_id=developer_id, - agent_id=agent.id, - data=CreateTaskRequest(**task_def), - client=client, - ) + task_def = { + "name": "test task", + "description": "test task about", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [map_step], + } - async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): - execution, handle = await start_execution( + task = create_task( developer_id=developer_id, - task_id=task.id, - data=data, + agent_id=agent.id, + data=CreateTaskRequest(**task_def), client=client, ) - assert handle is not None - assert execution.task_id == task.id - assert execution.input == data.input + async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): + execution, handle = await start_execution( + developer_id=developer_id, + task_id=task.id, + data=data, + client=client, + ) - mock_run_task_execution_workflow.assert_called_once() + assert handle is not None + assert execution.task_id == task.id + assert execution.input == data.input - result = await handle.result() - assert [r["res"] for r in result] == [ - "a!", - "b!", - "c!", - "d!", - "e!", - "f!", - "g!", - "h!", - "i!", - "j!", - "k!", - "l!", - "m!", - "n!", - "o!", - "p!", - "q!", - "r!", - "s!", - "t!", - "u!", - "v!", - "w!", - "x!", - "y!", - "z!", - ] + mock_run_task_execution_workflow.assert_called_once() + + result = await handle.result() + assert [r["res"] for r in result] == [ + "a!", + "b!", + "c!", + "d!", + "e!", + "f!", + "g!", + "h!", + "i!", + "j!", + "k!", + "l!", + "m!", + "n!", + "o!", + "p!", + "q!", + "r!", + "s!", + "t!", + "u!", + "v!", + "w!", + "x!", + "y!", + "z!", + ] @test("workflow: prompt step") diff --git a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts index 9e01f9a31..0cbdabf41 100644 --- a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts @@ -139,6 +139,7 @@ export const $Tasks_CreateTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts index e8da500b5..3bc182d80 100644 --- a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts @@ -133,6 +133,7 @@ export const $Tasks_PatchTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_Task.ts b/sdks/ts/src/api/schemas/$Tasks_Task.ts index c39a3ba75..35cb45e9d 100644 --- a/sdks/ts/src/api/schemas/$Tasks_Task.ts +++ b/sdks/ts/src/api/schemas/$Tasks_Task.ts @@ -139,6 +139,7 @@ export const $Tasks_Task = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts index 25a6c98fa..6a3869214 100644 --- a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts @@ -139,6 +139,7 @@ export const $Tasks_UpdateTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/typespec/tasks/steps.tsp b/typespec/tasks/steps.tsp index 8fe95a7ea..5c8f53840 100644 --- a/typespec/tasks/steps.tsp +++ b/typespec/tasks/steps.tsp @@ -251,6 +251,7 @@ model MapReduceStep> extends BaseWor initial?: unknown = #[]; /** Whether to run the reduce expression in parallel and how many items to run in each batch */ + @minValue(1) parallelism?: uint16; }