Skip to content

Commit

Permalink
fix: Minor fix to using eval instead of ast.literal_eval
Browse files Browse the repository at this point in the history
Signed-off-by: Diwank Singh Tomer <[email protected]>
  • Loading branch information
creatorrr committed Sep 4, 2024
1 parent 95ba97b commit d139570
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 67 deletions.
12 changes: 11 additions & 1 deletion agents-api/agents_api/activities/task_steps/base_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,18 @@ async def base_evaluate(
extra_lambdas = {}
if extra_lambda_strs:
for k, v in extra_lambda_strs.items():
v = v.strip()

# Check that all extra lambdas are valid
assert v.startswith("lambda "), "All extra lambdas must start with 'lambda'"
extra_lambdas[k] = ast.literal_eval(v)

try:
ast.parse(v)
except Exception as e:
raise ValueError(f"Invalid lambda: {v}") from e

# Eval the lambda and add it to the extra lambdas
extra_lambdas[k] = eval(v)

# Turn the nested dict values from pydantic to dicts where possible
values = {
Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/autogen/Tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class Main(BaseModel):
"""
The initial value of the reduce expression
"""
parallelism: int | None = None
parallelism: Annotated[int | None, Field(None, ge=1)]
"""
Whether to run the reduce expression in parallel and how many items to run in each batch
"""
Expand Down Expand Up @@ -391,7 +391,7 @@ class MainModel(BaseModel):
"""
The initial value of the reduce expression
"""
parallelism: int | None = None
parallelism: Annotated[int | None, Field(None, ge=1)]
"""
Whether to run the reduce expression in parallel and how many items to run in each batch
"""
Expand Down
129 changes: 65 additions & 64 deletions agents-api/tests/test_execution_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,79 +699,80 @@ async def _(
assert [r["res"] for r in result] == ["a", "b", "c"]


@test("workflow: map reduce step parallel (parallelism=10)")
async def _(
client=cozo_client,
developer_id=test_developer_id,
agent=test_agent,
):
data = CreateExecutionRequest(input={"test": "input"})

map_step = {
"over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()",
"map": {
"evaluate": {"res": "_ + '!'"},
},
"parallelism": 10,
}
for p in range(1, 30, 6):
@test(f"workflow: map reduce step parallel (parallelism={p})")
async def _(
client=cozo_client,
developer_id=test_developer_id,
agent=test_agent,
):
data = CreateExecutionRequest(input={"test": "input"})

task_def = {
"name": "test task",
"description": "test task about",
"input_schema": {"type": "object", "additionalProperties": True},
"main": [map_step],
}
map_step = {
"over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()",
"map": {
"evaluate": {"res": "_ + '!'"},
},
"parallelism": p,
}

task = create_task(
developer_id=developer_id,
agent_id=agent.id,
data=CreateTaskRequest(**task_def),
client=client,
)
task_def = {
"name": "test task",
"description": "test task about",
"input_schema": {"type": "object", "additionalProperties": True},
"main": [map_step],
}

async with patch_testing_temporal() as (_, mock_run_task_execution_workflow):
execution, handle = await start_execution(
task = create_task(
developer_id=developer_id,
task_id=task.id,
data=data,
agent_id=agent.id,
data=CreateTaskRequest(**task_def),
client=client,
)

assert handle is not None
assert execution.task_id == task.id
assert execution.input == data.input
async with patch_testing_temporal() as (_, mock_run_task_execution_workflow):
execution, handle = await start_execution(
developer_id=developer_id,
task_id=task.id,
data=data,
client=client,
)

mock_run_task_execution_workflow.assert_called_once()
assert handle is not None
assert execution.task_id == task.id
assert execution.input == data.input

result = await handle.result()
assert [r["res"] for r in result] == [
"a!",
"b!",
"c!",
"d!",
"e!",
"f!",
"g!",
"h!",
"i!",
"j!",
"k!",
"l!",
"m!",
"n!",
"o!",
"p!",
"q!",
"r!",
"s!",
"t!",
"u!",
"v!",
"w!",
"x!",
"y!",
"z!",
]
mock_run_task_execution_workflow.assert_called_once()

result = await handle.result()
assert [r["res"] for r in result] == [
"a!",
"b!",
"c!",
"d!",
"e!",
"f!",
"g!",
"h!",
"i!",
"j!",
"k!",
"l!",
"m!",
"n!",
"o!",
"p!",
"q!",
"r!",
"s!",
"t!",
"u!",
"v!",
"w!",
"x!",
"y!",
"z!",
]


@test("workflow: prompt step")
Expand Down
1 change: 1 addition & 0 deletions sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export const $Tasks_CreateTaskRequest = {
type: "number",
description: `Whether to run the reduce expression in parallel and how many items to run in each batch`,
format: "uint16",
minimum: 1,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ export const $Tasks_PatchTaskRequest = {
type: "number",
description: `Whether to run the reduce expression in parallel and how many items to run in each batch`,
format: "uint16",
minimum: 1,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions sdks/ts/src/api/schemas/$Tasks_Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export const $Tasks_Task = {
type: "number",
description: `Whether to run the reduce expression in parallel and how many items to run in each batch`,
format: "uint16",
minimum: 1,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export const $Tasks_UpdateTaskRequest = {
type: "number",
description: `Whether to run the reduce expression in parallel and how many items to run in each batch`,
format: "uint16",
minimum: 1,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions typespec/tasks/steps.tsp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ model MapReduceStep<Value = unknown, Accumulator = Array<Value>> extends BaseWor
initial?: unknown = #[];

/** Whether to run the reduce expression in parallel and how many items to run in each batch */
@minValue(1)
parallelism?: uint16;
}

Expand Down

0 comments on commit d139570

Please sign in to comment.