Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agents-api): Fix tool conversion issues #789

Merged
merged 7 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 62 additions & 207 deletions agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Any
from uuid import UUID

Expand All @@ -16,32 +17,7 @@
from ..common.protocol.tasks import StepContext
from ..common.storage_handler import auto_blob_store
from ..env import testing
from ..models.agent.create_agent import create_agent as create_agent_query
from ..models.agent.delete_agent import delete_agent as delete_agent_query
from ..models.agent.get_agent import get_agent as get_agent_query
from ..models.agent.list_agents import list_agents as list_agents_query
from ..models.agent.update_agent import update_agent as update_agent_query
from ..models.docs.delete_doc import delete_doc as delete_doc_query
from ..models.docs.list_docs import list_docs as list_docs_query
from ..models.session.create_session import create_session as create_session_query
from ..models.session.delete_session import delete_session as delete_session_query
from ..models.session.get_session import get_session as get_session_query
from ..models.session.list_sessions import list_sessions as list_sessions_query
from ..models.session.update_session import update_session as update_session_query
from ..models.task.create_task import create_task as create_task_query
from ..models.task.delete_task import delete_task as delete_task_query
from ..models.task.get_task import get_task as get_task_query
from ..models.task.list_tasks import list_tasks as list_tasks_query
from ..models.task.update_task import update_task as update_task_query
from ..models.user.create_user import create_user as create_user_query
from ..models.user.delete_user import delete_user as delete_user_query
from ..models.user.get_user import get_user as get_user_query
from ..models.user.list_users import list_users as list_users_query
from ..models.user.update_user import update_user as update_user_query
from ..routers.docs.create_doc import create_agent_doc, create_user_doc
from ..routers.docs.search_docs import search_agent_docs, search_user_docs

# FIXME: This is a total mess. Should be refactored.
from .utils import get_handler


@auto_blob_store
Expand All @@ -50,6 +26,7 @@ async def execute_system(
context: StepContext,
system: SystemDef,
) -> Any:
"""Execute a system call with the appropriate handler and transformed arguments."""
arguments: dict[str, Any] = system.arguments or {}
arguments["developer_id"] = context.execution_input.developer_id

Expand All @@ -61,198 +38,76 @@ async def execute_system(
arguments[key] = value.to_list()

# Convert all UUIDs to UUID objects
if "agent_id" in arguments:
arguments["agent_id"] = UUID(arguments["agent_id"])
if "user_id" in arguments:
arguments["user_id"] = UUID(arguments["user_id"])
if "task_id" in arguments:
arguments["task_id"] = UUID(arguments["task_id"])
if "session_id" in arguments:
arguments["session_id"] = UUID(arguments["session_id"])
if "doc_id" in arguments:
arguments["doc_id"] = UUID(arguments["doc_id"])
uuid_fields = ["agent_id", "user_id", "task_id", "session_id", "doc_id"]
for field in uuid_fields:
if field in arguments:
arguments[field] = UUID(arguments[field])

# FIXME: This is a total mess. Should be refactored.
try:
# AGENTS
if system.resource == "agent":
# DOCS SUBRESOURCE
if system.subresource == "doc":
# Define the arguments for the agent doc queries
agent_doc_args = {
**{
"owner_type": "agent",
"owner_id": arguments["agent_id"],
},
**arguments,
}
agent_doc_args.pop("agent_id")

if system.operation == "list":
return list_docs_query(**agent_doc_args)

elif system.operation == "create":
# The `create_agent_doc` function requires `x_developer_id` instead of `developer_id`.
arguments["x_developer_id"] = arguments.pop("developer_id")
return await create_agent_doc(
data=CreateDocRequest(**arguments.pop("data")),
background_tasks=BackgroundTasks(),
**arguments,
)

elif system.operation == "delete":
return delete_doc_query(**agent_doc_args)

elif system.operation == "search":
# The `search_agent_docs` function requires `x_developer_id` instead of `developer_id`.
arguments["x_developer_id"] = arguments.pop("developer_id")

if "text" in arguments and "vector" in arguments:
search_params = HybridDocSearchRequest(
text=arguments.pop("text"),
vector=arguments.pop("vector"),
alpha=arguments.pop("alpha", 0.75),
confidence=arguments.pop("confidence", 0.5),
limit=arguments.get("limit", 10),
)

elif "text" in arguments:
search_params = TextOnlyDocSearchRequest(
text=arguments.pop("text"),
limit=arguments.get("limit", 10),
)
elif "vector" in arguments:
search_params = VectorDocSearchRequest(
vector=arguments.pop("vector"),
confidence=arguments.pop("confidence", 0.7),
limit=arguments.get("limit", 10),
)

return await search_agent_docs(
search_params=search_params,
**arguments,
)

# NO SUBRESOURCE
elif system.subresource is None:
if system.operation == "list":
return list_agents_query(**arguments)
elif system.operation == "get":
return get_agent_query(**arguments)
elif system.operation == "create":
return create_agent_query(**arguments)
elif system.operation == "update":
return update_agent_query(**arguments)
elif system.operation == "delete":
return delete_agent_query(**arguments)

# USERS
elif system.resource == "user":
# DOCS SUBRESOURCE
if system.subresource == "doc":
# Define the arguments for the user doc queries
user_doc_args = {
**{
"owner_type": "user",
"owner_id": arguments["user_id"],
},
handler = get_handler(system)

# Transform arguments for doc-related operations
if system.subresource == "doc":
owner_id_field = f"{system.resource}_id"
if owner_id_field in arguments:
doc_args = {
"owner_type": system.resource,
"owner_id": arguments[owner_id_field],
**arguments,
}
user_doc_args.pop("user_id")

if system.operation == "list":
return list_docs_query(**user_doc_args)

elif system.operation == "create":
# The `create_user_doc` function requires `x_developer_id` instead of `developer_id`.
arguments["x_developer_id"] = arguments.pop("developer_id")
return await create_user_doc(
data=CreateDocRequest(**arguments.pop("data")),
background_tasks=BackgroundTasks(),
**arguments,
)

elif system.operation == "delete":
return delete_doc_query(**user_doc_args)

elif system.operation == "search":
# The `search_user_docs` function requires `x_developer_id` instead of `developer_id`.
arguments["x_developer_id"] = arguments.pop("developer_id")

if "text" in arguments and "vector" in arguments:
search_params = HybridDocSearchRequest(
text=arguments.pop("text"),
vector=arguments.pop("vector"),
limit=arguments.get("limit", 10),
)

elif "text" in arguments:
search_params = TextOnlyDocSearchRequest(
text=arguments.pop("text"),
limit=arguments.get("limit", 10),
)
elif "vector" in arguments:
search_params = VectorDocSearchRequest(
vector=arguments.pop("vector"),
limit=arguments.get("limit", 10),
)

return await search_user_docs(
search_params=search_params,
**arguments,
)

# NO SUBRESOURCE
elif system.subresource is None:
if system.operation == "list":
return list_users_query(**arguments)
elif system.operation == "get":
return get_user_query(**arguments)
elif system.operation == "create":
return create_user_query(**arguments)
elif system.operation == "update":
return update_user_query(**arguments)
elif system.operation == "delete":
return delete_user_query(**arguments)

# SESSIONS
elif system.resource == "session":
if system.operation == "list":
return list_sessions_query(**arguments)
elif system.operation == "get":
return get_session_query(**arguments)
elif system.operation == "create":
return create_session_query(**arguments)
elif system.operation == "update":
return update_session_query(**arguments)
# FIXME: @hamada needs to be fixed
# elif system.operation == "delete":
# return update_session_query(**arguments)
elif system.operation == "delete":
return delete_session_query(**arguments)
# TASKS
elif system.resource == "task":
if system.operation == "list":
return list_tasks_query(**arguments)
elif system.operation == "get":
return get_task_query(**arguments)
elif system.operation == "create":
return create_task_query(**arguments)
elif system.operation == "update":
return update_task_query(**arguments)
elif system.operation == "delete":
return delete_task_query(**arguments)

raise NotImplementedError(f"System call not implemented for {
system.resource}.{system.operation}")
doc_args.pop(owner_id_field)
arguments = doc_args

# Handle special cases for doc operations
if system.operation == "create" and system.subresource == "doc":
arguments["x_developer_id"] = arguments.pop("developer_id")
return await handler(
data=CreateDocRequest(**arguments.pop("data")),
background_tasks=BackgroundTasks(),
**arguments,
)

# Handle search operations
if system.operation == "search" and system.subresource == "doc":
arguments["x_developer_id"] = arguments.pop("developer_id")
search_params = _create_search_request(arguments)
return await handler(search_params=search_params, **arguments)

# Handle regular operations
if asyncio.iscoroutinefunction(handler):
return await handler(**arguments)
return handler(**arguments)

except BaseException as e:
if activity.in_activity():
activity.logger.error(f"Error in execute_system_call: {e}")
raise


# Mock and activity definition
def _create_search_request(arguments: dict) -> Any:
"""Create appropriate search request based on available parameters."""
if "text" in arguments and "vector" in arguments:
return HybridDocSearchRequest(
text=arguments.pop("text"),
vector=arguments.pop("vector"),
alpha=arguments.pop("alpha", 0.75),
confidence=arguments.pop("confidence", 0.5),
limit=arguments.get("limit", 10),
)
elif "text" in arguments:
return TextOnlyDocSearchRequest(
text=arguments.pop("text"),
limit=arguments.get("limit", 10),
)
elif "vector" in arguments:
return VectorDocSearchRequest(
vector=arguments.pop("vector"),
confidence=arguments.pop("confidence", 0.7),
limit=arguments.get("limit", 10),
)


# Keep the existing mock and activity definition
mock_execute_system = execute_system

execute_system = activity.defn(name="execute_system")(
Expand Down
Loading
Loading