Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
HamadaSalhab committed Oct 21, 2024
1 parent a6626a7 commit 67cee60
Showing 1 changed file with 61 additions and 235 deletions.
296 changes: 61 additions & 235 deletions agents-api/agents_api/autogen/openapi_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,257 +289,83 @@ def validate_reduce_expression(cls, v):

_CreateTaskRequest = CreateTaskRequest

CreateTaskRequest.model_config = ConfigDict(
**{
**_CreateTaskRequest.model_config,
"extra": "allow",
}
)


@model_validator(mode="before")
def validate_task_request(self):
workflows = {
k: v
for k, v in self.model_dump().items()
if k not in _CreateTaskRequest.model_fields or k == "main"
}

for workflow_name, workflow_definition in workflows.items():
try:
validate_workflow_steps(workflow_definition)
setattr(self, workflow_name, WorkflowType(workflow_definition))
except ValueError as e:
raise ValueError(f"Invalid workflow '{
workflow_name}': {str(e)}")
return self


CreateTaskRequest.validate_task_request = validate_task_request
class CreateTaskRequest(_CreateTaskRequest):
@model_validator(mode="after")
def validate_task_request(self) -> 'CreateTaskRequest':
workflows = {
k: v
for k, v in self.model_dump().items()
if k not in _CreateTaskRequest.model_fields or k == "main"
}

for workflow_name, workflow_definition in workflows.items():
try:
print("VALIDATING WORKFLOW STEPS")
validate_workflow_steps(workflow_definition)
print("VALIDATED WORKFLOW STEPS")
except ValueError as e:
raise ValueError(f"Invalid workflow '{
workflow_name}': {str(e)}")
return self
pass

def validate_workflow_steps(steps):
for index, step in enumerate(steps):
if not isinstance(step, dict) or len(step) != 1:
raise ValueError(f"Invalid step format at index {index}")

print("STARTED VALIDATING STEP")
step_type, step_content = next(iter(step.items()))
print(f"STEP TYPE: {step_type}")
print(f"STEP CONTENT: {step_content}")
print("STEP ITEMS ARE:")
step_type = step.get('kind_')
step_content = step[step_type]
for k, v in step.items():
print(f"{k}: {v}")
try:
validate_step(step_type, step_content, index)
print(f"VALIDATING STEP {step_type}")
validate_step(step_type, step_content)
print(f"VALIDATED STEP {step_type}")
except ValueError as e:
raise ValueError(f"Error in step {index}: {str(e)}")


def validate_evaluate_step(content):
if not isinstance(content, dict):
raise ValueError("'evaluate' step content must be a dictionary")
for key, value in content.items():
if not isinstance(value, str):
raise ValueError(f"In 'evaluate' step, the value for key '{
key}' must be a string (Python expression)")


def validate_prompt_step(content):
if isinstance(content, str):
is_valid, error = validate_jinja_template(content)
if not is_valid:
raise ValueError(f"Invalid Jinja template in prompt: {error}")
elif isinstance(content, list):
for item in content:
if (
not isinstance(item, dict)
or "role" not in item
or "content" not in item
):
raise ValueError(
"Each prompt message must be a dictionary with 'role' and 'content' keys"
)
is_valid, error = validate_jinja_template(item["content"])
if not is_valid:
raise ValueError(f"Invalid Jinja template in prompt content: {error}")
else:
raise ValueError("Prompt content must be either a string or a list of messages")


def validate_tool_call_step(content):
if not isinstance(content, dict):
raise ValueError("'tool' step content must be a dictionary")
if "tool" not in content:
raise ValueError("'tool' step must contain a 'tool' key with the tool name")
if "arguments" in content:
if not isinstance(content["arguments"], dict):
raise ValueError("'arguments' in tool call step must be a dictionary")
for key, value in content["arguments"].items():
if isinstance(value, str):
is_valid, error = validate_python_expression(value)
if not is_valid:
raise ValueError(
f"Invalid Python expression in tool argument '{key}': {error}"
)


def validate_get_step(content):
if not isinstance(content, dict):
raise ValueError("'get' step content must be a dictionary")
for key, value in content.items():
if not isinstance(value, str):
raise ValueError(f"In 'get' step, the value for key '{
key}' must be a string (variable name)")


def validate_set_step(content):
if not isinstance(content, dict):
raise ValueError("'set' step content must be a dictionary")
for key, value in content.items():
is_valid, error = validate_python_expression(value)
if not is_valid:
raise ValueError(
f"Invalid Python expression in 'set' step for key '{key}': {error}"
)


def validate_sleep_step(content):
if not isinstance(content, (int, float)) or content < 0:
raise ValueError("'sleep' step content must be a non-negative number")


def validate_yield_step(content):
if not isinstance(content, dict):
raise ValueError("'yield' step content must be a dictionary")
if "workflow" not in content:
raise ValueError(
"'yield' step must contain a 'workflow' key with the workflow name"
)
if "arguments" in content:
if not isinstance(content["arguments"], dict):
raise ValueError("'arguments' in yield step must be a dictionary")
for key, value in content["arguments"].items():
is_valid, error = validate_python_expression(value)
if not is_valid:
raise ValueError(
f"Invalid Python expression in yield argument '{key}': {error}"
)


def validate_if_else_step(content):
if not isinstance(content, dict):
raise ValueError("'if' step content must be a dictionary")
if "if" not in content:
raise ValueError("'if' step must contain an 'if' key with the condition")
if "then" not in content:
raise ValueError(
"'if' step must contain a 'then' key with the steps to execute if condition is true"
)

is_valid, error = validate_python_expression(content["if"])
if not is_valid:
raise ValueError(f"Invalid Python expression in 'if' condition: {error}")

validate_workflow_steps(content["then"])

if "else" in content:
validate_workflow_steps(content["else"])


def validate_switch_step(content, index, workflow_name):
if not isinstance(content, list):
raise ValueError("'switch' step content must be a list of case-then pairs")
for case_index, case in enumerate(content):
if not isinstance(case, dict) or "case" not in case or "then" not in case:
raise ValueError(
f"Each case in 'switch' step must be a dictionary with 'case' and 'then' keys"
)
if case["case"] != "_":
is_valid, error = validate_python_expression(case["case"])
if not is_valid:
raise ValueError(f"Invalid Python expression in 'switch' case {
case_index}: {error}")
validate_workflow_steps([case["then"]])


def validate_foreach_step(content, index, workflow_name):
if not isinstance(content, dict):
raise ValueError("'foreach' step content must be a dictionary")
if "in" not in content:
raise ValueError("'foreach' step must contain an 'in' key with the iterable")
if "do" not in content:
raise ValueError(
"'foreach' step must contain a 'do' key with the steps to execute for each iteration"
)

is_valid, error = validate_python_expression(content["in"])
if not is_valid:
raise ValueError(f"Invalid Python expression in 'foreach' iterable: {error}")

validate_workflow_steps([content["do"]])


def validate_parallel_step(content, index, workflow_name):
if not isinstance(content, list):
raise ValueError(
"'parallel' step content must be a list of steps to execute in parallel"
)
for parallel_index, parallel_step in enumerate(content):
validate_workflow_steps(
[parallel_step], f"{workflow_name}.parallel{parallel_index}"
)


def validate_map_reduce_step(content):
if not isinstance(content, dict):
raise ValueError("'map_reduce' step content must be a dictionary")
if "over" not in content:
raise ValueError(
"'map_reduce' step must contain an 'over' key with the iterable"
)
if "map" not in content:
raise ValueError(
"'map_reduce' step must contain a 'map' key with the mapping function"
)

is_valid, error = validate_python_expression(content["over"])
if not is_valid:
raise ValueError(f"Invalid Python expression in 'map_reduce' iterable: {error}")

validate_workflow_steps([content["map"]], f"{workflow_name}.map")

if "reduce" in content:
is_valid, error = validate_python_expression(content["reduce"])
if not is_valid:
raise ValueError(
f"Invalid Python expression in 'map_reduce' reduce function: {error}"
)


# Update the step_validators dictionary
step_validators = {
"evaluate": validate_evaluate_step,
"prompt": validate_prompt_step,
"tool": validate_tool_call_step,
"get": validate_get_step,
"set": validate_set_step,
"log": validate_log_template, # This function is already defined
"return": validate_return_expressions, # This function is already defined
"sleep": validate_sleep_step,
# No additional validation needed for error step
"error": lambda content, index, workflow_name: None,
"yield": validate_yield_step,
"if": validate_if_else_step,
"switch": validate_switch_step,
"foreach": validate_foreach_step,
"parallel": validate_parallel_step,
"map_reduce": validate_map_reduce_step,
print("FINISHED VALIDATING STEP")

# Mapping between step types and their corresponding classes
step_type_to_class = {
"evaluate": EvaluateStep,
"tool_call": ToolCallStep,
"prompt": PromptStep,
"get": GetStep,
"set": SetStep,
"log": LogStep,
"return": ReturnStep,
"sleep": SleepStep,
"error": ErrorWorkflowStep,
"yield": YieldStep,
"wait_for_input": WaitForInputStep,
"if_else": IfElseWorkflowStep,
"switch": SwitchStep,
"foreach": ForeachStep,
"parallel": ParallelStep,
"map": MapReduceStep
}


# Update the validate_step function to use this mapping
def validate_step(step_type, step_content):
validator = step_validators.get(step_type)
if validator:
validator(step_content)
step_class = step_type_to_class.get(step_type)
if step_class:
try:
# TODO: Add support for other fields (eg. map step needs additional 'over' field)
step_class(**{
step_type: step_content
})
except ValidationError as e:
raise ValueError(f"Validation error for '{step_type}' step: {str(e)}")
else:
raise ValueError(f"Unknown step type '{step_type}'")




# Custom types (not generated correctly)
# --------------------------------------

Expand Down

0 comments on commit 67cee60

Please sign in to comment.