From 95e0df6f78a1e43d9cb98173d0121cfdf374f44c Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 16:06:26 -0400 Subject: [PATCH 1/8] refactor(agents-api): Minor refactor to make task execution workflow easier to read Signed-off-by: Diwank Singh Tomer --- .../workflows/task_execution/__init__.py | 209 ++++-------------- .../workflows/task_execution/helpers.py | 167 ++++++++++++++ agents-api/tests/test_execution_workflow.py | 2 +- 3 files changed, 215 insertions(+), 163 deletions(-) create mode 100644 agents-api/agents_api/workflows/task_execution/helpers.py diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index 7e65cb707..c30a47ce3 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -8,6 +8,7 @@ from temporalio import workflow from temporalio.exceptions import ApplicationError +# Import necessary modules and types with workflow.unsafe.imports_passed_through(): from ...activities import task_steps from ...autogen.openapi_model import ( @@ -31,7 +32,6 @@ ToolCallStep, TransitionTarget, WaitForInputStep, - Workflow, WorkflowStep, YieldStep, ) @@ -43,6 +43,13 @@ ) from ...env import debug, testing from .transition import transition + from .helpers import ( + continue_as_child, + execute_switch_branch, + execute_if_else_branch, + execute_foreach_step, + execute_map_reduce_step, + ) # Supported steps # --------------- @@ -70,6 +77,7 @@ # | MapReduceStep # ✅ # ) +# Mapping of step types to their corresponding activities STEP_TO_ACTIVITY = { PromptStep: task_steps.prompt_step, # ToolCallStep: tool_call_step, @@ -98,26 +106,11 @@ # TODO: The timeouts should be configurable per task -async def continue_as_child( - execution_input: ExecutionInput, - start: TransitionTarget, - previous_inputs: list[Any], - user_state: dict[str, Any] = {}, -) -> Any: - return await workflow.execute_child_workflow( - TaskExecutionWorkflow.run, - args=[ - execution_input, - start, - previous_inputs, - user_state, - ], - # TODO: Should add search_attributes for queryability - ) - - # TODO: Review the current user state storage method # Probably can be implemented much more efficiently + + +# Main workflow definition @workflow.defn class TaskExecutionWorkflow: user_state: dict[str, Any] = {} @@ -126,6 +119,7 @@ def __init__(self) -> None: self.user_state = {} # TODO: Add endpoints for getting and setting user state for an execution + # Query methods for user state @workflow.query def get_user_state(self) -> dict[str, Any]: return self.user_state @@ -134,6 +128,7 @@ def get_user_state(self) -> dict[str, Any]: def get_user_state_by_key(self, key: str) -> Any: return self.user_state.get(key) + # Signal methods for updating user state @workflow.signal def set_user_state(self, key: str, value: Any) -> None: self.user_state[key] = value @@ -142,6 +137,7 @@ def set_user_state(self, key: str, value: Any) -> None: def update_user_state(self, values: dict[str, Any]) -> None: self.user_state.update(values) + # Main workflow run method @workflow.run async def run( self, @@ -250,38 +246,14 @@ async def run( return output # <--- Byeeee! case SwitchStep(switch=switch), StepOutcome(output=index) if index >= 0: - workflow.logger.info(f"Switch step: Chose branch {index}") - chosen_branch = switch[index] - - # Create a faux workflow - case_wf_name = ( - f"`{context.cursor.workflow}`[{context.cursor.step}].case" - ) - - case_task = execution_input.task.model_copy() - case_task.workflows = [ - Workflow(name=case_wf_name, steps=[chosen_branch.then]) - ] - - # Create a new execution input - case_execution_input = execution_input.model_copy() - case_execution_input.task = case_task - - # Set the next target to the chosen branch - case_next_target = TransitionTarget(workflow=case_wf_name, step=0) - - case_args = [ - case_execution_input, - case_next_target, + result = await execute_switch_branch( + context, + execution_input, + switch, + index, previous_inputs, - ] - - # Execute the chosen branch and come back here - result = await continue_as_child( - *case_args, - user_state=self.user_state, + self.user_state, ) - state = PartialTransition(output=result) case SwitchStep(), StepOutcome(output=index) if index < 0: @@ -291,129 +263,42 @@ async def run( case IfElseWorkflowStep(then=then_branch, else_=else_branch), StepOutcome( output=condition ): - workflow.logger.info( - f"If-Else step: Condition evaluated to {condition}" - ) - # Choose the branch based on the condition - chosen_branch = then_branch if condition else else_branch - - # Create a faux workflow - if_else_wf_name = ( - f"`{context.cursor.workflow}`[{context.cursor.step}].if_else" - ) - if_else_wf_name += ".then" if condition else ".else" - - if_else_task = execution_input.task.model_copy() - if_else_task.workflows = [ - Workflow(name=if_else_wf_name, steps=[chosen_branch]) - ] - - # Create a new execution input - if_else_execution_input = execution_input.model_copy() - if_else_execution_input.task = if_else_task - - # Set the next target to the chosen branch - if_else_next_target = TransitionTarget(workflow=if_else_wf_name, step=0) - - if_else_args = [ - if_else_execution_input, - if_else_next_target, + result = await execute_if_else_branch( + context, + execution_input, + then_branch, + else_branch, + condition, previous_inputs, - ] - - # Execute the chosen branch and come back here - result = await continue_as_child( - *if_else_args, - user_state=self.user_state, + self.user_state, ) state = PartialTransition(output=result) case ForeachStep(foreach=ForeachDo(do=do_step)), StepOutcome(output=items): - workflow.logger.info(f"Foreach step: Iterating over {len(items)} items") - for i, item in enumerate(items): - # Create a faux workflow - foreach_wf_name = f"`{context.cursor.workflow}`[{context.cursor.step}].foreach[{i}]" - - foreach_task = execution_input.task.model_copy() - foreach_task.workflows = [ - Workflow(name=foreach_wf_name, steps=[do_step]) - ] - - # Create a new execution input - foreach_execution_input = execution_input.model_copy() - foreach_execution_input.task = foreach_task - - # Set the next target to the chosen branch - foreach_next_target = TransitionTarget( - workflow=foreach_wf_name, step=0 - ) - - foreach_args = [ - foreach_execution_input, - foreach_next_target, - previous_inputs + [{"item": item}], - ] - - # Execute the chosen branch and come back here - result = await continue_as_child( - *foreach_args, - user_state=self.user_state, - ) - + result = await execute_foreach_step( + context, + execution_input, + do_step, + items, + previous_inputs, + self.user_state, + ) state = PartialTransition(output=result) case MapReduceStep( map=map_defn, reduce=reduce, initial=initial ), StepOutcome(output=items): - workflow.logger.info(f"MapReduce step: Processing {len(items)} items") - result = initial or [] - reduce = reduce or "results + [_]" - - for i, item in enumerate(items): - workflow_name = f"`{context.cursor.workflow}`[{context.cursor.step}].mapreduce[{i}]" - map_reduce_task = execution_input.task.model_copy() - - defn_dict = map_defn.model_dump() - step_defn = GenericStep(**defn_dict).root - map_reduce_task.workflows = [ - Workflow(name=workflow_name, steps=[step_defn]) - ] - - # Create a new execution input - map_reduce_execution_input = execution_input.model_copy() - map_reduce_execution_input.task = map_reduce_task - - # Set the next target to the chosen branch - map_reduce_next_target = TransitionTarget( - workflow=workflow_name, step=0 - ) - - map_reduce_args = [ - map_reduce_execution_input, - map_reduce_next_target, - previous_inputs + [item], - ] - - # TODO: Parallelize map-reduce step - # SCRUM-14 - - # Execute the chosen branch and come back here - output = await continue_as_child( - *map_reduce_args, - user_state=self.user_state, - ) - - # Reduce the result with the initial value - result = await workflow.execute_activity( - task_steps.base_evaluate, - args=[ - reduce, - {"results": result, "_": output}, - ], - schedule_to_close_timeout=timedelta(seconds=2), - ) - + result = await execute_map_reduce_step( + context, + execution_input, + map_defn, + reduce, + initial, + items, + previous_inputs, + self.user_state, + ) state = PartialTransition(output=result) case SleepStep( @@ -572,7 +457,7 @@ async def run( f"Continuing to next step: {final_state.next.workflow}.{final_state.next.step}" ) - # TODO: Should use a continue_as_new workflow if history grows too large + # Continue as a child workflow return await continue_as_child( execution_input=execution_input, start=final_state.next, diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py new file mode 100644 index 000000000..6323cba40 --- /dev/null +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -0,0 +1,167 @@ +from datetime import timedelta +from typing import Any + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from ...activities import task_steps + from ...autogen.openapi_model import ( + TransitionTarget, + Workflow, + WorkflowStep, + ) + from ...common.protocol.tasks import ( + ExecutionInput, + StepContext, + ) + + +async def continue_as_child( + execution_input: ExecutionInput, + start: TransitionTarget, + previous_inputs: list[Any], + user_state: dict[str, Any] = {}, +) -> Any: + return await workflow.execute_child_workflow( + "TaskExecutionWorkflow", + args=[ + execution_input, + start, + previous_inputs, + user_state, + ], + ) + + +async def execute_switch_branch( + context: StepContext, + execution_input: ExecutionInput, + switch: list, + index: int, + previous_inputs: list[Any], + user_state: dict[str, Any] = {}, +) -> Any: + workflow.logger.info(f"Switch step: Chose branch {index}") + chosen_branch = switch[index] + + case_wf_name = f"`{context.cursor.workflow}`[{context.cursor.step}].case" + + case_task = execution_input.task.model_copy() + case_task.workflows = [Workflow(name=case_wf_name, steps=[chosen_branch.then])] + + case_execution_input = execution_input.model_copy() + case_execution_input.task = case_task + + case_next_target = TransitionTarget(workflow=case_wf_name, step=0) + + return await continue_as_child( + case_execution_input, + case_next_target, + previous_inputs, + user_state=user_state, + ) + + +async def execute_if_else_branch( + context: StepContext, + execution_input: ExecutionInput, + then_branch: WorkflowStep, + else_branch: WorkflowStep, + condition: bool, + previous_inputs: list[Any], + user_state: dict[str, Any] = {}, +) -> Any: + workflow.logger.info(f"If-Else step: Condition evaluated to {condition}") + chosen_branch = then_branch if condition else else_branch + + if_else_wf_name = f"`{context.cursor.workflow}`[{context.cursor.step}].if_else" + if_else_wf_name += ".then" if condition else ".else" + + if_else_task = execution_input.task.model_copy() + if_else_task.workflows = [Workflow(name=if_else_wf_name, steps=[chosen_branch])] + + if_else_execution_input = execution_input.model_copy() + if_else_execution_input.task = if_else_task + + if_else_next_target = TransitionTarget(workflow=if_else_wf_name, step=0) + + return await continue_as_child( + if_else_execution_input, + if_else_next_target, + previous_inputs, + user_state=user_state, + ) + + +async def execute_foreach_step( + context: StepContext, + execution_input: ExecutionInput, + do_step: WorkflowStep, + items: list[Any], + previous_inputs: list[Any], + user_state: dict[str, Any], +) -> Any: + workflow.logger.info(f"Foreach step: Iterating over {len(items)} items") + results = [] + + for i, item in enumerate(items): + foreach_wf_name = ( + f"`{context.cursor.workflow}`[{context.cursor.step}].foreach[{i}]" + ) + foreach_task = execution_input.task.model_copy() + foreach_task.workflows = [Workflow(name=foreach_wf_name, steps=[do_step])] + + foreach_execution_input = execution_input.model_copy() + foreach_execution_input.task = foreach_task + foreach_next_target = TransitionTarget(workflow=foreach_wf_name, step=0) + + result = await continue_as_child( + foreach_execution_input, + foreach_next_target, + previous_inputs + [{"item": item}], + user_state=user_state, + ) + results.append(result) + + return results + + +async def execute_map_reduce_step( + context: StepContext, + execution_input: ExecutionInput, + map_defn: WorkflowStep, + reduce: str, + initial: Any, + items: list[Any], + previous_inputs: list[Any], + user_state: dict[str, Any], +) -> Any: + workflow.logger.info(f"MapReduce step: Processing {len(items)} items") + result = initial or [] + reduce = reduce or "results + [_]" + + for i, item in enumerate(items): + workflow_name = ( + f"`{context.cursor.workflow}`[{context.cursor.step}].mapreduce[{i}]" + ) + map_reduce_task = execution_input.task.model_copy() + map_reduce_task.workflows = [Workflow(name=workflow_name, steps=[map_defn])] + + map_reduce_execution_input = execution_input.model_copy() + map_reduce_execution_input.task = map_reduce_task + map_reduce_next_target = TransitionTarget(workflow=workflow_name, step=0) + + output = await continue_as_child( + map_reduce_execution_input, + map_reduce_next_target, + previous_inputs + [item], + user_state=user_state, + ) + + result = await workflow.execute_activity( + task_steps.base_evaluate, + args=[reduce, {"results": result, "_": output}], + schedule_to_close_timeout=timedelta(seconds=2), + ) + + return result diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index 9cd12bb8f..61c3aba01 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -647,7 +647,7 @@ async def _( mock_run_task_execution_workflow.assert_called_once() result = await handle.result() - assert result["hello"] == "world" + assert result[0]["hello"] == "world" @test("workflow: map reduce step") From 1860574d4ef0f5f7d9db1c3938546669575c6464 Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 16:06:44 -0400 Subject: [PATCH 2/8] feat(agents-api): Add parallel flag to map reduce Signed-off-by: Diwank Singh Tomer --- agents-api/agents_api/autogen/Tasks.py | 14 ++++++++++++++ sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts | 7 +++++++ sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts | 7 +++++++ sdks/ts/src/api/models/Tasks_Task.ts | 7 +++++++ sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts | 7 +++++++ .../ts/src/api/schemas/$Tasks_CreateTaskRequest.ts | 5 +++++ sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts | 5 +++++ sdks/ts/src/api/schemas/$Tasks_Task.ts | 5 +++++ .../ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts | 5 +++++ typespec/tasks/steps.tsp | 4 ++++ 10 files changed, 66 insertions(+) diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index dcd2e920c..5405f4f4a 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -347,6 +347,13 @@ class Main(BaseModel): A special parameter named `results` is the accumulator and `_` is the current value. """ initial: Any = [] + """ + The initial value of the reduce expression + """ + parallel: StrictBool = False + """ + Whether to run the reduce expression in parallel + """ class MainModel(BaseModel): @@ -381,6 +388,13 @@ class MainModel(BaseModel): A special parameter named `results` is the accumulator and `_` is the current value. """ initial: Any = [] + """ + The initial value of the reduce expression + """ + parallel: StrictBool = False + """ + Whether to run the reduce expression in parallel + """ class ParallelStep(BaseModel): diff --git a/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts index 9590b4304..41bd3aaf1 100644 --- a/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts @@ -72,7 +72,14 @@ export type Tasks_CreateTaskRequest = Record< * A special parameter named `results` is the accumulator and `_` is the current value. */ reduce?: Common_PyExpression; + /** + * The initial value of the reduce expression + */ initial?: any; + /** + * Whether to run the reduce expression in parallel + */ + parallel?: boolean; }) > >; diff --git a/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts index e511be553..bfe6d5200 100644 --- a/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts @@ -71,7 +71,14 @@ export type Tasks_PatchTaskRequest = Record< * A special parameter named `results` is the accumulator and `_` is the current value. */ reduce?: Common_PyExpression; + /** + * The initial value of the reduce expression + */ initial?: any; + /** + * Whether to run the reduce expression in parallel + */ + parallel?: boolean; }) > >; diff --git a/sdks/ts/src/api/models/Tasks_Task.ts b/sdks/ts/src/api/models/Tasks_Task.ts index e155187ae..128383b13 100644 --- a/sdks/ts/src/api/models/Tasks_Task.ts +++ b/sdks/ts/src/api/models/Tasks_Task.ts @@ -72,7 +72,14 @@ export type Tasks_Task = Record< * A special parameter named `results` is the accumulator and `_` is the current value. */ reduce?: Common_PyExpression; + /** + * The initial value of the reduce expression + */ initial?: any; + /** + * Whether to run the reduce expression in parallel + */ + parallel?: boolean; }) > >; diff --git a/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts index 8f3afc2c0..b68d626c4 100644 --- a/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts @@ -72,7 +72,14 @@ export type Tasks_UpdateTaskRequest = Record< * A special parameter named `results` is the accumulator and `_` is the current value. */ reduce?: Common_PyExpression; + /** + * The initial value of the reduce expression + */ initial?: any; + /** + * Whether to run the reduce expression in parallel + */ + parallel?: boolean; }) > >; diff --git a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts index 882c1cf13..e8d7cc129 100644 --- a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts @@ -132,8 +132,13 @@ export const $Tasks_CreateTaskRequest = { ], }, initial: { + description: `The initial value of the reduce expression`, properties: {}, }, + parallel: { + type: "boolean", + description: `Whether to run the reduce expression in parallel`, + }, }, }, ], diff --git a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts index 45ad2927f..9d2399916 100644 --- a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts @@ -126,8 +126,13 @@ export const $Tasks_PatchTaskRequest = { ], }, initial: { + description: `The initial value of the reduce expression`, properties: {}, }, + parallel: { + type: "boolean", + description: `Whether to run the reduce expression in parallel`, + }, }, }, ], diff --git a/sdks/ts/src/api/schemas/$Tasks_Task.ts b/sdks/ts/src/api/schemas/$Tasks_Task.ts index c89bade35..ef0f7c590 100644 --- a/sdks/ts/src/api/schemas/$Tasks_Task.ts +++ b/sdks/ts/src/api/schemas/$Tasks_Task.ts @@ -132,8 +132,13 @@ export const $Tasks_Task = { ], }, initial: { + description: `The initial value of the reduce expression`, properties: {}, }, + parallel: { + type: "boolean", + description: `Whether to run the reduce expression in parallel`, + }, }, }, ], diff --git a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts index 84255c8ca..a65329be6 100644 --- a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts @@ -132,8 +132,13 @@ export const $Tasks_UpdateTaskRequest = { ], }, initial: { + description: `The initial value of the reduce expression`, properties: {}, }, + parallel: { + type: "boolean", + description: `Whether to run the reduce expression in parallel`, + }, }, }, ], diff --git a/typespec/tasks/steps.tsp b/typespec/tasks/steps.tsp index 863c39b26..7dea4ab0e 100644 --- a/typespec/tasks/steps.tsp +++ b/typespec/tasks/steps.tsp @@ -247,7 +247,11 @@ model MapReduceStep> extends BaseWor * A special parameter named `results` is the accumulator and `_` is the current value. */ reduce?: ReduceExpression; + /** The initial value of the reduce expression */ initial?: unknown = #[]; + + /** Whether to run the reduce expression in parallel */ + parallel?: boolean = false; } ///////////////////////// From c96e0fb8b6ff0e223039e8dd234f5a2bbe355042 Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 18:26:18 -0400 Subject: [PATCH 3/8] feat(agents-api): Add parallelism option to map-reduce step Signed-off-by: Diwank Singh Tomer --- .../activities/task_steps/base_evaluate.py | 9 +- agents-api/agents_api/activities/utils.py | 17 +++- agents-api/agents_api/autogen/Tasks.py | 8 +- agents-api/agents_api/env.py | 4 + .../workflows/task_execution/__init__.py | 85 ++++++++++------- .../workflows/task_execution/helpers.py | 94 +++++++++++++++++-- agents-api/tests/test_execution_workflow.py | 48 ++++++++++ .../src/api/models/Tasks_CreateTaskRequest.ts | 4 +- .../src/api/models/Tasks_PatchTaskRequest.ts | 4 +- sdks/ts/src/api/models/Tasks_Task.ts | 4 +- .../src/api/models/Tasks_UpdateTaskRequest.ts | 4 +- .../api/schemas/$Tasks_CreateTaskRequest.ts | 7 +- .../api/schemas/$Tasks_PatchTaskRequest.ts | 7 +- sdks/ts/src/api/schemas/$Tasks_Task.ts | 7 +- .../api/schemas/$Tasks_UpdateTaskRequest.ts | 7 +- typespec/tasks/steps.tsp | 4 +- 16 files changed, 243 insertions(+), 70 deletions(-) diff --git a/agents-api/agents_api/activities/task_steps/base_evaluate.py b/agents-api/agents_api/activities/task_steps/base_evaluate.py index 2d8dec0c7..24d2459ec 100644 --- a/agents-api/agents_api/activities/task_steps/base_evaluate.py +++ b/agents-api/agents_api/activities/task_steps/base_evaluate.py @@ -13,10 +13,17 @@ async def base_evaluate( exprs: str | list[str] | dict[str, str], values: dict[str, Any] = {}, + extra_lambda_strs: dict[str, str] | None = None, ) -> Any | list[Any] | dict[str, Any]: input_len = 1 if isinstance(exprs, str) else len(exprs) assert input_len > 0, "exprs must be a non-empty string, list or dict" + extra_lambdas = {} + if extra_lambda_strs: + for k, v in extra_lambda_strs.items(): + assert v.startswith("lambda "), "All extra lambdas must start with 'lambda'" + extra_lambdas[k] = eval(v) + # Turn the nested dict values from pydantic to dicts where possible values = { k: v.model_dump() if isinstance(v, BaseModel) else v for k, v in values.items() @@ -25,7 +32,7 @@ async def base_evaluate( # frozen_box doesn't work coz we need some mutability in the values values = Box(values, frozen_box=False, conversion_box=True) - evaluator = get_evaluator(names=values) + evaluator = get_evaluator(names=values, extra_functions=extra_lambdas) try: match exprs: diff --git a/agents-api/agents_api/activities/utils.py b/agents-api/agents_api/activities/utils.py index 55195ee35..e1ed55d10 100644 --- a/agents-api/agents_api/activities/utils.py +++ b/agents-api/agents_api/activities/utils.py @@ -1,5 +1,7 @@ import json -from typing import Any +from functools import reduce +from itertools import accumulate +from typing import Any, Callable import re2 import yaml @@ -10,6 +12,7 @@ # TODO: We need to make sure that we dont expose any security issues ALLOWED_FUNCTIONS = { "abs": abs, + "accumulate": accumulate, "all": all, "any": any, "bool": bool, @@ -22,9 +25,12 @@ "list": list, "load_json": json.loads, "load_yaml": lambda string: yaml.load(string, Loader=CSafeLoader), + "map": map, "match_regex": lambda pattern, string: bool(re2.fullmatch(pattern, string)), "max": max, "min": min, + "range": range, + "reduce": reduce, "round": round, "search_regex": lambda pattern, string: re2.search(pattern, string), "set": set, @@ -36,8 +42,13 @@ @beartype -def get_evaluator(names: dict[str, Any]) -> SimpleEval: - evaluator = EvalWithCompoundTypes(names=names, functions=ALLOWED_FUNCTIONS) +def get_evaluator( + names: dict[str, Any], extra_functions: dict[str, Callable] | None = None +) -> SimpleEval: + evaluator = EvalWithCompoundTypes( + names=names, functions=ALLOWED_FUNCTIONS | (extra_functions or {}) + ) + return evaluator diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index 5405f4f4a..bac5e45df 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -350,9 +350,9 @@ class Main(BaseModel): """ The initial value of the reduce expression """ - parallel: StrictBool = False + parallelism: int | None = None """ - Whether to run the reduce expression in parallel + Whether to run the reduce expression in parallel and how many items to run in each batch """ @@ -391,9 +391,9 @@ class MainModel(BaseModel): """ The initial value of the reduce expression """ - parallel: StrictBool = False + parallelism: int | None = None """ - Whether to run the reduce expression in parallel + Whether to run the reduce expression in parallel and how many items to run in each batch """ diff --git a/agents-api/agents_api/env.py b/agents-api/agents_api/env.py index 8d0b05cae..fc388687a 100644 --- a/agents-api/agents_api/env.py +++ b/agents-api/agents_api/env.py @@ -20,6 +20,10 @@ api_prefix: str = env.str("AGENTS_API_PREFIX", default="") +# Tasks +# ----- +task_max_parallelism: int = env.int("AGENTS_API_TASK_MAX_PARALLELISM", default=100) + # Debug # ----- debug: bool = env.bool("AGENTS_API_DEBUG", default=False) diff --git a/agents-api/agents_api/workflows/task_execution/__init__.py b/agents-api/agents_api/workflows/task_execution/__init__.py index c30a47ce3..4cbd33556 100644 --- a/agents-api/agents_api/workflows/task_execution/__init__.py +++ b/agents-api/agents_api/workflows/task_execution/__init__.py @@ -42,14 +42,15 @@ StepOutcome, ) from ...env import debug, testing - from .transition import transition from .helpers import ( continue_as_child, - execute_switch_branch, - execute_if_else_branch, execute_foreach_step, + execute_if_else_branch, execute_map_reduce_step, + execute_map_reduce_step_parallel, + execute_switch_branch, ) + from .transition import transition # Supported steps # --------------- @@ -247,12 +248,12 @@ async def run( case SwitchStep(switch=switch), StepOutcome(output=index) if index >= 0: result = await execute_switch_branch( - context, - execution_input, - switch, - index, - previous_inputs, - self.user_state, + context=context, + execution_input=execution_input, + switch=switch, + index=index, + previous_inputs=previous_inputs, + user_state=self.user_state, ) state = PartialTransition(output=result) @@ -264,40 +265,56 @@ async def run( output=condition ): result = await execute_if_else_branch( - context, - execution_input, - then_branch, - else_branch, - condition, - previous_inputs, - self.user_state, + context=context, + execution_input=execution_input, + then_branch=then_branch, + else_branch=else_branch, + condition=condition, + previous_inputs=previous_inputs, + user_state=self.user_state, ) state = PartialTransition(output=result) case ForeachStep(foreach=ForeachDo(do=do_step)), StepOutcome(output=items): result = await execute_foreach_step( - context, - execution_input, - do_step, - items, - previous_inputs, - self.user_state, + context=context, + execution_input=execution_input, + do_step=do_step, + items=items, + previous_inputs=previous_inputs, + user_state=self.user_state, ) state = PartialTransition(output=result) case MapReduceStep( - map=map_defn, reduce=reduce, initial=initial - ), StepOutcome(output=items): + map=map_defn, reduce=reduce, initial=initial, parallelism=parallelism + ), StepOutcome(output=items) if parallelism is None or parallelism == 1: result = await execute_map_reduce_step( - context, - execution_input, - map_defn, - reduce, - initial, - items, - previous_inputs, - self.user_state, + context=context, + execution_input=execution_input, + map_defn=map_defn, + items=items, + reduce=reduce, + initial=initial, + previous_inputs=previous_inputs, + user_state=self.user_state, + ) + state = PartialTransition(output=result) + + case MapReduceStep( + map=map_defn, reduce=reduce, initial=initial, parallelism=parallelism + ), StepOutcome(output=items): + result = await execute_map_reduce_step_parallel( + context=context, + execution_input=execution_input, + map_defn=map_defn, + items=items, + previous_inputs=previous_inputs, + user_state=self.user_state, + initial=initial, + reduce=reduce, + parallelism=parallelism, ) state = PartialTransition(output=result) @@ -351,7 +368,7 @@ async def run( ) result = await continue_as_child( - execution_input=execution_input, + context, start=yield_next_target, previous_inputs=[output], user_state=self.user_state, @@ -459,7 +476,7 @@ async def run( # Continue as a child workflow return await continue_as_child( - execution_input=execution_input, + context, start=final_state.next, previous_inputs=previous_inputs + [final_state.output], user_state=self.user_state, diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py index 6323cba40..ae16acca1 100644 --- a/agents-api/agents_api/workflows/task_execution/helpers.py +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -1,7 +1,9 @@ +import asyncio from datetime import timedelta from typing import Any from temporalio import workflow +from temporalio.exceptions import ApplicationError with workflow.unsafe.imports_passed_through(): from ...activities import task_steps @@ -14,6 +16,7 @@ ExecutionInput, StepContext, ) + from ...env import task_max_parallelism async def continue_as_child( @@ -34,12 +37,13 @@ async def continue_as_child( async def execute_switch_branch( + *, context: StepContext, execution_input: ExecutionInput, switch: list, index: int, previous_inputs: list[Any], - user_state: dict[str, Any] = {}, + user_state: dict[str, Any], ) -> Any: workflow.logger.info(f"Switch step: Chose branch {index}") chosen_branch = switch[index] @@ -63,13 +67,14 @@ async def execute_switch_branch( async def execute_if_else_branch( + *, context: StepContext, execution_input: ExecutionInput, then_branch: WorkflowStep, else_branch: WorkflowStep, condition: bool, previous_inputs: list[Any], - user_state: dict[str, Any] = {}, + user_state: dict[str, Any], ) -> Any: workflow.logger.info(f"If-Else step: Condition evaluated to {condition}") chosen_branch = then_branch if condition else else_branch @@ -94,6 +99,7 @@ async def execute_if_else_branch( async def execute_foreach_step( + *, context: StepContext, execution_input: ExecutionInput, do_step: WorkflowStep, @@ -127,18 +133,19 @@ async def execute_foreach_step( async def execute_map_reduce_step( + *, context: StepContext, execution_input: ExecutionInput, map_defn: WorkflowStep, - reduce: str, - initial: Any, items: list[Any], previous_inputs: list[Any], user_state: dict[str, Any], + reduce: str | None = None, + initial: Any = [], ) -> Any: workflow.logger.info(f"MapReduce step: Processing {len(items)} items") - result = initial or [] - reduce = reduce or "results + [_]" + result = initial + reduce = "results + [_]" if reduce is None else reduce for i, item in enumerate(items): workflow_name = ( @@ -165,3 +172,78 @@ async def execute_map_reduce_step( ) return result + + +async def execute_map_reduce_step_parallel( + *, + context: StepContext, + execution_input: ExecutionInput, + map_defn: WorkflowStep, + items: list[Any], + previous_inputs: list[Any], + user_state: dict[str, Any], + initial: Any = [], + reduce: str | None = None, + parallelism: int = task_max_parallelism, +) -> Any: + workflow.logger.info(f"MapReduce step: Processing {len(items)} items") + results = initial + + parallelism = min(parallelism, task_max_parallelism) + assert parallelism > 1, "Parallelism must be greater than 1" + + # Modify reduce expression to use reduce function (since we are passing a list) + reduce = "results + [_]" if reduce is None else reduce + reduce = reduce.replace("_", "_item").replace("results", "_result") + + # Explanation: + # - reduce is the reduce expression + # - reducer_lambda is the lambda function that will be used to reduce the results + extra_lambda_strs = dict(reducer_lambda=f"lambda _result, _item: ({reduce})") + + reduce = "reduce(reducer_lambda, _, results)" + + # First create batches of items to run in parallel + batches = [items[i : i + parallelism] for i in range(0, len(items), parallelism)] + + for i, batch in enumerate(batches): + batch_pending = [] + + for j, item in enumerate(batch): + workflow_name = f"`{context.cursor.workflow}`[{context.cursor.step}].mapreduce[{i}][{j}]" + map_reduce_task = execution_input.task.model_copy() + map_reduce_task.workflows = [Workflow(name=workflow_name, steps=[map_defn])] + + map_reduce_execution_input = execution_input.model_copy() + map_reduce_execution_input.task = map_reduce_task + map_reduce_next_target = TransitionTarget(workflow=workflow_name, step=0) + + batch_pending.append( + continue_as_child( + map_reduce_execution_input, + map_reduce_next_target, + previous_inputs + [item], + user_state=user_state, + ) + ) + + # Wait for all the batch items to complete + try: + batch_results = await asyncio.gather(*batch_pending) + + # Reduce the results of the batch + results = await workflow.execute_activity( + task_steps.base_evaluate, + args=[ + reduce, + {"results": results, "_": batch_results}, + extra_lambda_strs, + ], + schedule_to_close_timeout=timedelta(seconds=2), + ) + + except BaseException as e: + workflow.logger.error(f"Error in batch {i}: {e}") + raise ApplicationError(f"Error in batch {i}: {e}") from e + + return results diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index 61c3aba01..60e78ee84 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -697,6 +697,54 @@ async def _( assert [r["res"] for r in result] == ["a", "b", "c"] +@test("workflow: map reduce step parallel (basic)") +async def _( + client=cozo_client, + developer_id=test_developer_id, + agent=test_agent, +): + data = CreateExecutionRequest(input={"test": "input"}) + + map_step = { + "over": "'a b c d e f'.split()", + "map": { + "evaluate": {"res": "_"}, + }, + "parallelism": 2, + } + + task_def = { + "name": "test task", + "description": "test task about", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [map_step], + } + + task = create_task( + developer_id=developer_id, + agent_id=agent.id, + data=CreateTaskRequest(**task_def), + client=client, + ) + + async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): + execution, handle = await start_execution( + developer_id=developer_id, + task_id=task.id, + data=data, + client=client, + ) + + assert handle is not None + assert execution.task_id == task.id + assert execution.input == data.input + + mock_run_task_execution_workflow.assert_called_once() + + result = await handle.result() + assert [r["res"] for r in result] == ["a", "b", "c", "d", "e", "f"] + + @test("workflow: prompt step") async def _( client=cozo_client, diff --git a/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts index 41bd3aaf1..7920156d5 100644 --- a/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/models/Tasks_CreateTaskRequest.ts @@ -77,9 +77,9 @@ export type Tasks_CreateTaskRequest = Record< */ initial?: any; /** - * Whether to run the reduce expression in parallel + * Whether to run the reduce expression in parallel and how many items to run in each batch */ - parallel?: boolean; + parallelism?: number; }) > >; diff --git a/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts index bfe6d5200..68c20b2cb 100644 --- a/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/models/Tasks_PatchTaskRequest.ts @@ -76,9 +76,9 @@ export type Tasks_PatchTaskRequest = Record< */ initial?: any; /** - * Whether to run the reduce expression in parallel + * Whether to run the reduce expression in parallel and how many items to run in each batch */ - parallel?: boolean; + parallelism?: number; }) > >; diff --git a/sdks/ts/src/api/models/Tasks_Task.ts b/sdks/ts/src/api/models/Tasks_Task.ts index 128383b13..27d25315a 100644 --- a/sdks/ts/src/api/models/Tasks_Task.ts +++ b/sdks/ts/src/api/models/Tasks_Task.ts @@ -77,9 +77,9 @@ export type Tasks_Task = Record< */ initial?: any; /** - * Whether to run the reduce expression in parallel + * Whether to run the reduce expression in parallel and how many items to run in each batch */ - parallel?: boolean; + parallelism?: number; }) > >; diff --git a/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts index b68d626c4..d0f3d7812 100644 --- a/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/models/Tasks_UpdateTaskRequest.ts @@ -77,9 +77,9 @@ export type Tasks_UpdateTaskRequest = Record< */ initial?: any; /** - * Whether to run the reduce expression in parallel + * Whether to run the reduce expression in parallel and how many items to run in each batch */ - parallel?: boolean; + parallelism?: number; }) > >; diff --git a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts index e8d7cc129..9e01f9a31 100644 --- a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts @@ -135,9 +135,10 @@ export const $Tasks_CreateTaskRequest = { description: `The initial value of the reduce expression`, properties: {}, }, - parallel: { - type: "boolean", - description: `Whether to run the reduce expression in parallel`, + parallelism: { + type: "number", + description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, + format: "uint16", }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts index 9d2399916..e8da500b5 100644 --- a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts @@ -129,9 +129,10 @@ export const $Tasks_PatchTaskRequest = { description: `The initial value of the reduce expression`, properties: {}, }, - parallel: { - type: "boolean", - description: `Whether to run the reduce expression in parallel`, + parallelism: { + type: "number", + description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, + format: "uint16", }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_Task.ts b/sdks/ts/src/api/schemas/$Tasks_Task.ts index ef0f7c590..c39a3ba75 100644 --- a/sdks/ts/src/api/schemas/$Tasks_Task.ts +++ b/sdks/ts/src/api/schemas/$Tasks_Task.ts @@ -135,9 +135,10 @@ export const $Tasks_Task = { description: `The initial value of the reduce expression`, properties: {}, }, - parallel: { - type: "boolean", - description: `Whether to run the reduce expression in parallel`, + parallelism: { + type: "number", + description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, + format: "uint16", }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts index a65329be6..25a6c98fa 100644 --- a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts @@ -135,9 +135,10 @@ export const $Tasks_UpdateTaskRequest = { description: `The initial value of the reduce expression`, properties: {}, }, - parallel: { - type: "boolean", - description: `Whether to run the reduce expression in parallel`, + parallelism: { + type: "number", + description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, + format: "uint16", }, }, }, diff --git a/typespec/tasks/steps.tsp b/typespec/tasks/steps.tsp index 7dea4ab0e..8fe95a7ea 100644 --- a/typespec/tasks/steps.tsp +++ b/typespec/tasks/steps.tsp @@ -250,8 +250,8 @@ model MapReduceStep> extends BaseWor /** The initial value of the reduce expression */ initial?: unknown = #[]; - /** Whether to run the reduce expression in parallel */ - parallel?: boolean = false; + /** Whether to run the reduce expression in parallel and how many items to run in each batch */ + parallelism?: uint16; } ///////////////////////// From 36526f94bffc4c0e2df93a3aeb496565d6ae2c36 Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 18:33:24 -0400 Subject: [PATCH 4/8] feat: Add more tests for parallel map step Signed-off-by: Diwank Singh Tomer --- agents-api/tests/test_execution_workflow.py | 78 +++++++++++---------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index 60e78ee84..d29001071 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -14,6 +14,7 @@ ) from agents_api.models.task.create_task import create_task from agents_api.routers.tasks.create_task_execution import start_execution +from agents_api.models.task.delete_task import delete_task from .fixtures import cozo_client, test_agent, test_developer_id from .utils import patch_testing_temporal @@ -697,52 +698,55 @@ async def _( assert [r["res"] for r in result] == ["a", "b", "c"] -@test("workflow: map reduce step parallel (basic)") -async def _( - client=cozo_client, - developer_id=test_developer_id, - agent=test_agent, -): - data = CreateExecutionRequest(input={"test": "input"}) - - map_step = { - "over": "'a b c d e f'.split()", - "map": { - "evaluate": {"res": "_"}, - }, - "parallelism": 2, - } +for p in range(1, 10): + @test(f"workflow: map reduce step parallel (parallelism={p})") + async def _( + client=cozo_client, + developer_id=test_developer_id, + agent=test_agent, + ): + data = CreateExecutionRequest(input={"test": "input"}) - task_def = { - "name": "test task", - "description": "test task about", - "input_schema": {"type": "object", "additionalProperties": True}, - "main": [map_step], - } + map_step = { + "over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()", + "map": { + "evaluate": {"res": "_"}, + }, + "parallelism": p, + } - task = create_task( - developer_id=developer_id, - agent_id=agent.id, - data=CreateTaskRequest(**task_def), - client=client, - ) + task_def = { + "name": "test task", + "description": "test task about", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [map_step], + } - async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): - execution, handle = await start_execution( + task = create_task( developer_id=developer_id, - task_id=task.id, - data=data, + agent_id=agent.id, + data=CreateTaskRequest(**task_def), client=client, ) - assert handle is not None - assert execution.task_id == task.id - assert execution.input == data.input + async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): + execution, handle = await start_execution( + developer_id=developer_id, + task_id=task.id, + data=data, + client=client, + ) - mock_run_task_execution_workflow.assert_called_once() + assert handle is not None + assert execution.task_id == task.id + assert execution.input == data.input - result = await handle.result() - assert [r["res"] for r in result] == ["a", "b", "c", "d", "e", "f"] + mock_run_task_execution_workflow.assert_called_once() + + result = await handle.result() + assert [r["res"] for r in result] == ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"] + + delete_task(developer_id=developer_id, agent_id=agent.id, task_id=task.id, client=client) @test("workflow: prompt step") From 45106f2102a03bceedd0b9c4ad2214df8a32245f Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 19:23:27 -0400 Subject: [PATCH 5/8] fix: Fix nasty parallelism race condition on cozodb Signed-off-by: Diwank Singh Tomer --- .../activities/task_steps/base_evaluate.py | 3 +- .../execution/create_execution_transition.py | 12 +- agents-api/agents_api/models/utils.py | 2 +- .../workflows/task_execution/helpers.py | 18 +-- agents-api/tests/test_execution_workflow.py | 105 +++++++++++------- 5 files changed, 86 insertions(+), 54 deletions(-) diff --git a/agents-api/agents_api/activities/task_steps/base_evaluate.py b/agents-api/agents_api/activities/task_steps/base_evaluate.py index 24d2459ec..fb9412cd9 100644 --- a/agents-api/agents_api/activities/task_steps/base_evaluate.py +++ b/agents-api/agents_api/activities/task_steps/base_evaluate.py @@ -1,3 +1,4 @@ +import ast from typing import Any from beartype import beartype @@ -22,7 +23,7 @@ async def base_evaluate( if extra_lambda_strs: for k, v in extra_lambda_strs.items(): assert v.startswith("lambda "), "All extra lambdas must start with 'lambda'" - extra_lambdas[k] = eval(v) + extra_lambdas[k] = ast.literal_eval(v) # Turn the nested dict values from pydantic to dicts where possible values = { diff --git a/agents-api/agents_api/models/execution/create_execution_transition.py b/agents-api/agents_api/models/execution/create_execution_transition.py index 89e924bf6..79497120b 100644 --- a/agents-api/agents_api/models/execution/create_execution_transition.py +++ b/agents-api/agents_api/models/execution/create_execution_transition.py @@ -82,12 +82,16 @@ def create_execution_transition( # Only required for updating the execution status as well update_execution_status: bool = False, task_id: UUID | None = None, -) -> tuple[list[str], dict]: +) -> tuple[list[str | None], dict]: transition_id = transition_id or uuid4() data.metadata = data.metadata or {} data.execution_id = execution_id + # TODO: This is a hack to make sure the transition is valid + # (parallel transitions are whack, we should do something better) + is_parallel = data.current.workflow.startswith("PAR:") + # Prepare the transition data transition_data = data.model_dump(exclude_unset=True, exclude={"id"}) @@ -184,9 +188,9 @@ def create_execution_transition( execution_id=execution_id, parents=[("agents", "agent_id"), ("tasks", "task_id")], ), - validate_status_query, - update_execution_query, - check_last_transition_query, + validate_status_query if not is_parallel else None, + update_execution_query if not is_parallel else None, + check_last_transition_query if not is_parallel else None, insert_query, ] diff --git a/agents-api/agents_api/models/utils.py b/agents-api/agents_api/models/utils.py index 4613fe7c7..98bf2a590 100644 --- a/agents-api/agents_api/models/utils.py +++ b/agents-api/agents_api/models/utils.py @@ -173,7 +173,7 @@ def make_cozo_json_query(fields): def cozo_query( - func: Callable[P, tuple[str | list[str], dict]] | None = None, + func: Callable[P, tuple[str | list[str | None], dict]] | None = None, debug: bool | None = None, ): def cozo_query_dec(func: Callable[P, tuple[str | list[Any], dict]]): diff --git a/agents-api/agents_api/workflows/task_execution/helpers.py b/agents-api/agents_api/workflows/task_execution/helpers.py index ae16acca1..9bb383299 100644 --- a/agents-api/agents_api/workflows/task_execution/helpers.py +++ b/agents-api/agents_api/workflows/task_execution/helpers.py @@ -210,7 +210,9 @@ async def execute_map_reduce_step_parallel( batch_pending = [] for j, item in enumerate(batch): - workflow_name = f"`{context.cursor.workflow}`[{context.cursor.step}].mapreduce[{i}][{j}]" + # Parallel batch workflow name + # Note: Added PAR: prefix to easily identify parallel batches in logs + workflow_name = f"PAR:`{context.cursor.workflow}`[{context.cursor.step}].mapreduce[{i}][{j}]" map_reduce_task = execution_input.task.model_copy() map_reduce_task.workflows = [Workflow(name=workflow_name, steps=[map_defn])] @@ -219,11 +221,13 @@ async def execute_map_reduce_step_parallel( map_reduce_next_target = TransitionTarget(workflow=workflow_name, step=0) batch_pending.append( - continue_as_child( - map_reduce_execution_input, - map_reduce_next_target, - previous_inputs + [item], - user_state=user_state, + asyncio.create_task( + continue_as_child( + map_reduce_execution_input, + map_reduce_next_target, + previous_inputs + [item], + user_state=user_state, + ) ) ) @@ -239,7 +243,7 @@ async def execute_map_reduce_step_parallel( {"results": results, "_": batch_results}, extra_lambda_strs, ], - schedule_to_close_timeout=timedelta(seconds=2), + schedule_to_close_timeout=timedelta(seconds=5), ) except BaseException as e: diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index d29001071..81daa8933 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -14,7 +14,6 @@ ) from agents_api.models.task.create_task import create_task from agents_api.routers.tasks.create_task_execution import start_execution -from agents_api.models.task.delete_task import delete_task from .fixtures import cozo_client, test_agent, test_developer_id from .utils import patch_testing_temporal @@ -698,55 +697,79 @@ async def _( assert [r["res"] for r in result] == ["a", "b", "c"] -for p in range(1, 10): - @test(f"workflow: map reduce step parallel (parallelism={p})") - async def _( - client=cozo_client, - developer_id=test_developer_id, - agent=test_agent, - ): - data = CreateExecutionRequest(input={"test": "input"}) +@test("workflow: map reduce step parallel (parallelism=10)") +async def _( + client=cozo_client, + developer_id=test_developer_id, + agent=test_agent, +): + data = CreateExecutionRequest(input={"test": "input"}) - map_step = { - "over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()", - "map": { - "evaluate": {"res": "_"}, - }, - "parallelism": p, - } + map_step = { + "over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()", + "map": { + "evaluate": {"res": "_ + '!'"}, + }, + "parallelism": 10, + } - task_def = { - "name": "test task", - "description": "test task about", - "input_schema": {"type": "object", "additionalProperties": True}, - "main": [map_step], - } + task_def = { + "name": "test task", + "description": "test task about", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [map_step], + } - task = create_task( + task = create_task( + developer_id=developer_id, + agent_id=agent.id, + data=CreateTaskRequest(**task_def), + client=client, + ) + + async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): + execution, handle = await start_execution( developer_id=developer_id, - agent_id=agent.id, - data=CreateTaskRequest(**task_def), + task_id=task.id, + data=data, client=client, ) - async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): - execution, handle = await start_execution( - developer_id=developer_id, - task_id=task.id, - data=data, - client=client, - ) - - assert handle is not None - assert execution.task_id == task.id - assert execution.input == data.input - - mock_run_task_execution_workflow.assert_called_once() + assert handle is not None + assert execution.task_id == task.id + assert execution.input == data.input - result = await handle.result() - assert [r["res"] for r in result] == ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"] + mock_run_task_execution_workflow.assert_called_once() - delete_task(developer_id=developer_id, agent_id=agent.id, task_id=task.id, client=client) + result = await handle.result() + assert [r["res"] for r in result] == [ + "a!", + "b!", + "c!", + "d!", + "e!", + "f!", + "g!", + "h!", + "i!", + "j!", + "k!", + "l!", + "m!", + "n!", + "o!", + "p!", + "q!", + "r!", + "s!", + "t!", + "u!", + "v!", + "w!", + "x!", + "y!", + "z!", + ] @test("workflow: prompt step") From 50c8e664a982074d748ab3f55269910b55ead196 Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 19:41:25 -0400 Subject: [PATCH 6/8] fix: Minor fix to using eval instead of ast.literal_eval Signed-off-by: Diwank Singh Tomer --- .../activities/task_steps/base_evaluate.py | 12 +- agents-api/agents_api/autogen/Tasks.py | 4 +- agents-api/tests/test_execution_workflow.py | 107 +++++++----------- .../api/schemas/$Tasks_CreateTaskRequest.ts | 1 + .../api/schemas/$Tasks_PatchTaskRequest.ts | 1 + sdks/ts/src/api/schemas/$Tasks_Task.ts | 1 + .../api/schemas/$Tasks_UpdateTaskRequest.ts | 1 + typespec/tasks/steps.tsp | 2 + 8 files changed, 62 insertions(+), 67 deletions(-) diff --git a/agents-api/agents_api/activities/task_steps/base_evaluate.py b/agents-api/agents_api/activities/task_steps/base_evaluate.py index fb9412cd9..0263345ec 100644 --- a/agents-api/agents_api/activities/task_steps/base_evaluate.py +++ b/agents-api/agents_api/activities/task_steps/base_evaluate.py @@ -22,8 +22,18 @@ async def base_evaluate( extra_lambdas = {} if extra_lambda_strs: for k, v in extra_lambda_strs.items(): + v = v.strip() + + # Check that all extra lambdas are valid assert v.startswith("lambda "), "All extra lambdas must start with 'lambda'" - extra_lambdas[k] = ast.literal_eval(v) + + try: + ast.parse(v) + except Exception as e: + raise ValueError(f"Invalid lambda: {v}") from e + + # Eval the lambda and add it to the extra lambdas + extra_lambdas[k] = eval(v) # Turn the nested dict values from pydantic to dicts where possible values = { diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index bac5e45df..8c8a22dbb 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -350,7 +350,7 @@ class Main(BaseModel): """ The initial value of the reduce expression """ - parallelism: int | None = None + parallelism: Annotated[int | None, Field(None, ge=1)] """ Whether to run the reduce expression in parallel and how many items to run in each batch """ @@ -391,7 +391,7 @@ class MainModel(BaseModel): """ The initial value of the reduce expression """ - parallelism: int | None = None + parallelism: Annotated[int | None, Field(None, ge=1)] """ Whether to run the reduce expression in parallel and how many items to run in each batch """ diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index 52fc393ca..9517736d3 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -699,79 +699,58 @@ async def _( assert [r["res"] for r in result] == ["a", "b", "c"] -@test("workflow: map reduce step parallel (parallelism=10)") -async def _( - client=cozo_client, - developer_id=test_developer_id, - agent=test_agent, -): - data = CreateExecutionRequest(input={"test": "input"}) - - map_step = { - "over": "'a b c d e f g h i j k l m n o p q r s t u v w x y z'.split()", - "map": { - "evaluate": {"res": "_ + '!'"}, - }, - "parallelism": 10, - } +for p in [1, 3, 5]: + @test(f"workflow: map reduce step parallel (parallelism={p})") + async def _( + client=cozo_client, + developer_id=test_developer_id, + agent=test_agent, + ): + data = CreateExecutionRequest(input={"test": "input"}) - task_def = { - "name": "test task", - "description": "test task about", - "input_schema": {"type": "object", "additionalProperties": True}, - "main": [map_step], - } + map_step = { + "over": "'a b c d'.split()", + "map": { + "evaluate": {"res": "_ + '!'"}, + }, + "parallelism": p, + } - task = create_task( - developer_id=developer_id, - agent_id=agent.id, - data=CreateTaskRequest(**task_def), - client=client, - ) + task_def = { + "name": "test task", + "description": "test task about", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [map_step], + } - async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): - execution, handle = await start_execution( + task = create_task( developer_id=developer_id, - task_id=task.id, - data=data, + agent_id=agent.id, + data=CreateTaskRequest(**task_def), client=client, ) - assert handle is not None - assert execution.task_id == task.id - assert execution.input == data.input + async with patch_testing_temporal() as (_, mock_run_task_execution_workflow): + execution, handle = await start_execution( + developer_id=developer_id, + task_id=task.id, + data=data, + client=client, + ) - mock_run_task_execution_workflow.assert_called_once() + assert handle is not None + assert execution.task_id == task.id + assert execution.input == data.input - result = await handle.result() - assert [r["res"] for r in result] == [ - "a!", - "b!", - "c!", - "d!", - "e!", - "f!", - "g!", - "h!", - "i!", - "j!", - "k!", - "l!", - "m!", - "n!", - "o!", - "p!", - "q!", - "r!", - "s!", - "t!", - "u!", - "v!", - "w!", - "x!", - "y!", - "z!", - ] + mock_run_task_execution_workflow.assert_called_once() + + result = await handle.result() + assert [r["res"] for r in result] == [ + "a!", + "b!", + "c!", + "d!", + ] @test("workflow: prompt step") diff --git a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts index 9e01f9a31..0cbdabf41 100644 --- a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts @@ -139,6 +139,7 @@ export const $Tasks_CreateTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts index e8da500b5..3bc182d80 100644 --- a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts @@ -133,6 +133,7 @@ export const $Tasks_PatchTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_Task.ts b/sdks/ts/src/api/schemas/$Tasks_Task.ts index c39a3ba75..35cb45e9d 100644 --- a/sdks/ts/src/api/schemas/$Tasks_Task.ts +++ b/sdks/ts/src/api/schemas/$Tasks_Task.ts @@ -139,6 +139,7 @@ export const $Tasks_Task = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts index 25a6c98fa..6a3869214 100644 --- a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts @@ -139,6 +139,7 @@ export const $Tasks_UpdateTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + minimum: 1, }, }, }, diff --git a/typespec/tasks/steps.tsp b/typespec/tasks/steps.tsp index 8fe95a7ea..d9be46577 100644 --- a/typespec/tasks/steps.tsp +++ b/typespec/tasks/steps.tsp @@ -251,6 +251,8 @@ model MapReduceStep> extends BaseWor initial?: unknown = #[]; /** Whether to run the reduce expression in parallel and how many items to run in each batch */ + @minValue(1) + @maxValue(100) parallelism?: uint16; } From c54f17202169a2efe08cd236ddbf7555c305fadb Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Wed, 4 Sep 2024 19:46:30 -0400 Subject: [PATCH 7/8] feat: Add max-value of 100 to parallelism Signed-off-by: Diwank Singh Tomer --- agents-api/agents_api/autogen/Tasks.py | 4 ++-- agents-api/tests/test_execution_workflow.py | 1 + sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts | 1 + sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts | 1 + sdks/ts/src/api/schemas/$Tasks_Task.ts | 1 + sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts | 1 + 6 files changed, 7 insertions(+), 2 deletions(-) diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index 8c8a22dbb..c1e69d492 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -350,7 +350,7 @@ class Main(BaseModel): """ The initial value of the reduce expression """ - parallelism: Annotated[int | None, Field(None, ge=1)] + parallelism: Annotated[int | None, Field(None, ge=1, le=100)] """ Whether to run the reduce expression in parallel and how many items to run in each batch """ @@ -391,7 +391,7 @@ class MainModel(BaseModel): """ The initial value of the reduce expression """ - parallelism: Annotated[int | None, Field(None, ge=1)] + parallelism: Annotated[int | None, Field(None, ge=1, le=100)] """ Whether to run the reduce expression in parallel and how many items to run in each batch """ diff --git a/agents-api/tests/test_execution_workflow.py b/agents-api/tests/test_execution_workflow.py index 9517736d3..82c4e3814 100644 --- a/agents-api/tests/test_execution_workflow.py +++ b/agents-api/tests/test_execution_workflow.py @@ -700,6 +700,7 @@ async def _( for p in [1, 3, 5]: + @test(f"workflow: map reduce step parallel (parallelism={p})") async def _( client=cozo_client, diff --git a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts index 0cbdabf41..305335b5f 100644 --- a/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_CreateTaskRequest.ts @@ -139,6 +139,7 @@ export const $Tasks_CreateTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + maximum: 100, minimum: 1, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts index 3bc182d80..200efb0e5 100644 --- a/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_PatchTaskRequest.ts @@ -133,6 +133,7 @@ export const $Tasks_PatchTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + maximum: 100, minimum: 1, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_Task.ts b/sdks/ts/src/api/schemas/$Tasks_Task.ts index 35cb45e9d..ba3e63928 100644 --- a/sdks/ts/src/api/schemas/$Tasks_Task.ts +++ b/sdks/ts/src/api/schemas/$Tasks_Task.ts @@ -139,6 +139,7 @@ export const $Tasks_Task = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + maximum: 100, minimum: 1, }, }, diff --git a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts index 6a3869214..297fffd37 100644 --- a/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts +++ b/sdks/ts/src/api/schemas/$Tasks_UpdateTaskRequest.ts @@ -139,6 +139,7 @@ export const $Tasks_UpdateTaskRequest = { type: "number", description: `Whether to run the reduce expression in parallel and how many items to run in each batch`, format: "uint16", + maximum: 100, minimum: 1, }, }, From 888887a5eb310564240d872fb9794f5faa43403f Mon Sep 17 00:00:00 2001 From: Diwank Singh Tomer Date: Fri, 6 Sep 2024 22:04:50 -0400 Subject: [PATCH 8/8] feat(llm-proxy): Add support for other claude models + minor fixes Signed-off-by: Diwank Singh Tomer --- agents-api/agents_api/activities/utils.py | 4 ++++ .../agents_api/models/session/get_session.py | 2 +- .../agents_api/models/session/list_sessions.py | 2 +- .../agents_api/models/session/patch_session.py | 2 +- .../models/session/prepare_session_data.py | 2 +- .../models/session/update_session.py | 2 +- agents-api/agents_api/models/task/get_task.py | 2 +- .../agents_api/models/task/list_tasks.py | 2 +- agents-api/agents_api/models/utils.py | 2 +- llm-proxy/litellm-config.yaml | 18 ++++++++++++++++++ memory-store/Dockerfile | 2 +- memory-store/backup.sh | 5 +++-- 12 files changed, 34 insertions(+), 11 deletions(-) diff --git a/agents-api/agents_api/activities/utils.py b/agents-api/agents_api/activities/utils.py index e1ed55d10..f9f7ded12 100644 --- a/agents-api/agents_api/activities/utils.py +++ b/agents-api/agents_api/activities/utils.py @@ -1,6 +1,8 @@ import json from functools import reduce from itertools import accumulate +from random import random +from time import time from typing import Any, Callable import re2 @@ -29,6 +31,7 @@ "match_regex": lambda pattern, string: bool(re2.fullmatch(pattern, string)), "max": max, "min": min, + "random": random, "range": range, "reduce": reduce, "round": round, @@ -36,6 +39,7 @@ "set": set, "str": str, "sum": sum, + "time": time, "tuple": tuple, "zip": zip, } diff --git a/agents-api/agents_api/models/session/get_session.py b/agents-api/agents_api/models/session/get_session.py index 0a365df2f..2d8956eb7 100644 --- a/agents-api/agents_api/models/session/get_session.py +++ b/agents-api/agents_api/models/session/get_session.py @@ -94,7 +94,7 @@ def get_session( render_templates, token_budget, context_overflow, - @ "NOW" + @ "END" }, updated_at = to_int(validity) """ diff --git a/agents-api/agents_api/models/session/list_sessions.py b/agents-api/agents_api/models/session/list_sessions.py index fa1097e5e..14f4cad0d 100644 --- a/agents-api/agents_api/models/session/list_sessions.py +++ b/agents-api/agents_api/models/session/list_sessions.py @@ -101,7 +101,7 @@ def list_sessions( metadata, token_budget, context_overflow, - @ "NOW" + @ "END" }}, users_p[users, id], participants[agents, "agent", id], diff --git a/agents-api/agents_api/models/session/patch_session.py b/agents-api/agents_api/models/session/patch_session.py index e6e0e5750..a9f3121b9 100644 --- a/agents-api/agents_api/models/session/patch_session.py +++ b/agents-api/agents_api/models/session/patch_session.py @@ -95,7 +95,7 @@ def patch_session( input[{session_update_cols}], ids[session_id, developer_id], *sessions{{ - {rest_fields}, metadata: md, @ "NOW" + {rest_fields}, metadata: md, @ "END" }}, updated_at = 'ASSERT', metadata = concat(md, $metadata), diff --git a/agents-api/agents_api/models/session/prepare_session_data.py b/agents-api/agents_api/models/session/prepare_session_data.py index 9a936b183..49cdc8cbc 100644 --- a/agents-api/agents_api/models/session/prepare_session_data.py +++ b/agents-api/agents_api/models/session/prepare_session_data.py @@ -159,7 +159,7 @@ def prepare_session_data( render_templates, token_budget, context_overflow, - @ "NOW" + @ "END" }, updated_at = to_int(validity), record = { diff --git a/agents-api/agents_api/models/session/update_session.py b/agents-api/agents_api/models/session/update_session.py index 99688bd98..3d3a685d1 100644 --- a/agents-api/agents_api/models/session/update_session.py +++ b/agents-api/agents_api/models/session/update_session.py @@ -84,7 +84,7 @@ def update_session( input[{session_update_cols}], ids[session_id, developer_id], *sessions{{ - {rest_fields}, @ "NOW" + {rest_fields}, @ "END" }}, updated_at = 'ASSERT' diff --git a/agents-api/agents_api/models/task/get_task.py b/agents-api/agents_api/models/task/get_task.py index 076936b6c..4bb3d06b6 100644 --- a/agents-api/agents_api/models/task/get_task.py +++ b/agents-api/agents_api/models/task/get_task.py @@ -64,7 +64,7 @@ def get_task( workflows, created_at, metadata, - @ 'NOW' + @ 'END' }, updated_at = to_int(updated_at_ms) / 1000 diff --git a/agents-api/agents_api/models/task/list_tasks.py b/agents-api/agents_api/models/task/list_tasks.py index 573c1404e..35c52d184 100644 --- a/agents-api/agents_api/models/task/list_tasks.py +++ b/agents-api/agents_api/models/task/list_tasks.py @@ -70,7 +70,7 @@ def list_tasks( workflows, created_at, metadata, - @ 'NOW' + @ 'END' }}, updated_at = to_int(updated_at_ms) / 1000 diff --git a/agents-api/agents_api/models/utils.py b/agents-api/agents_api/models/utils.py index 98bf2a590..c97d92451 100644 --- a/agents-api/agents_api/models/utils.py +++ b/agents-api/agents_api/models/utils.py @@ -97,7 +97,7 @@ def mark_session_updated_query(developer_id: UUID | str, session_id: UUID | str) render_templates, token_budget, context_overflow, - @ 'NOW' + @ 'END' }}, updated_at = [floor(now()), true] diff --git a/llm-proxy/litellm-config.yaml b/llm-proxy/litellm-config.yaml index fc276ac38..e91087461 100644 --- a/llm-proxy/litellm-config.yaml +++ b/llm-proxy/litellm-config.yaml @@ -38,6 +38,24 @@ model_list: tags: ["paid"] api_key: os.environ/ANTHROPIC_API_KEY +- model_name: "claude-3-opus" + litellm_params: + model: "claude-3-opus-20240229" + tags: ["paid"] + api_key: os.environ/ANTHROPIC_API_KEY + +- model_name: "claude-3-sonnet" + litellm_params: + model: "claude-3-sonnet-20240229" + tags: ["paid"] + api_key: os.environ/ANTHROPIC_API_KEY + +- model_name: "claude-3-haiku" + litellm_params: + model: "claude-3-haiku-20240307" + tags: ["paid"] + api_key: os.environ/ANTHROPIC_API_KEY + # Groq models - model_name: "llama-3.1-70b" litellm_params: diff --git a/memory-store/Dockerfile b/memory-store/Dockerfile index 9f77b660f..af3cb6e42 100644 --- a/memory-store/Dockerfile +++ b/memory-store/Dockerfile @@ -39,7 +39,7 @@ RUN \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* # Set fallback mount directory -ENV COZO_MNT_DIR=/data COZO_BACKUP_DIR=/backup APP_HOME=/app +ENV COZO_MNT_DIR=/data COZO_BACKUP_DIR=/backup APP_HOME=/app COZO_PORT=9070 WORKDIR $APP_HOME # Copy the cozo binary diff --git a/memory-store/backup.sh b/memory-store/backup.sh index 1e9e12e5c..0a4fff0dd 100644 --- a/memory-store/backup.sh +++ b/memory-store/backup.sh @@ -4,11 +4,12 @@ set -eo pipefail # Exit on error set -u # Exit on undefined variable # Ensure environment variables are set -if [ -z "$COZO_PORT" ] || [ -z "$COZO_AUTH_TOKEN" ]; then - echo "COZO_PORT or COZO_AUTH_TOKEN is not set" +if [ -z "$COZO_AUTH_TOKEN" ]; then + echo "COZO_AUTH_TOKEN is not set" exit 1 fi +COZO_PORT=${COZO_PORT:-9070} COZO_BACKUP_DIR=${COZO_BACKUP_DIR:-/backup} TIMESTAMP=$(date +%Y-%m-%d_%H-%M-%S) MAX_BACKUPS=${MAX_BACKUPS:-10}