From 17d9cc374e869b84e36550ca0f5a8f0d71a6c281 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 08:55:31 -0500 Subject: [PATCH 01/10] Clean up API structure --- eve/api.py | 266 +++----------------------------------------- eve/api/__init__.py | 0 eve/api/handlers.py | 181 ++++++++++++++++++++++++++++++ eve/api/requests.py | 35 ++++++ eve/api/utils.py | 54 +++++++++ eve/deploy.py | 3 +- 6 files changed, 286 insertions(+), 253 deletions(-) create mode 100644 eve/api/__init__.py create mode 100644 eve/api/handlers.py create mode 100644 eve/api/requests.py create mode 100644 eve/api/utils.py diff --git a/eve/api.py b/eve/api.py index a06002b..bff7f7e 100644 --- a/eve/api.py +++ b/eve/api.py @@ -1,37 +1,21 @@ import os -import json import modal from fastapi import FastAPI, Depends, BackgroundTasks -from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader, HTTPBearer -from pydantic import BaseModel, ConfigDict -from typing import Optional -from bson import ObjectId import logging from ably import AblyRealtime from pathlib import Path -import aiohttp -import traceback from eve import auth +from eve.api.handlers import handle_cancel, handle_deploy, handle_stream_chat +from eve.api.requests import CancelRequest, ChatRequest, TaskRequest from eve.deploy import ( - DeployCommand, DeployRequest, authenticate_modal_key, check_environment_exists, create_environment, - create_modal_secrets, - deploy_client, - stop_client, ) -from eve.tool import Tool -from eve.llm import UpdateType, UserMessage, async_prompt_thread, async_title_thread -from eve.thread import Thread -from eve.mongo import serialize_document -from eve.agent import Agent -from eve.user import User -from eve.task import Task from eve import deploy # Config and logging setup @@ -64,261 +48,39 @@ async def startup_event(): web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) -class TaskRequest(BaseModel): - tool: str - args: dict - user_id: str - -class CancelRequest(BaseModel): - task_id: str - user_id: str - -class UpdateConfig(BaseModel): - sub_channel_name: Optional[str] = None - update_endpoint: Optional[str] = None - discord_channel_id: Optional[str] = None - telegram_chat_id: Optional[str] = None - cast_hash: Optional[str] = None - author_fid: Optional[int] = None - message_id: Optional[str] = None - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ChatRequest(BaseModel): - user_id: str - agent_id: str - user_message: UserMessage - thread_id: Optional[str] = None - update_config: Optional[UpdateConfig] = None - force_reply: bool = False - - -async def handle_task(tool: str, user_id: str, args: dict = {}) -> dict: - tool = Tool.load(key=tool) - return await tool.async_start_task( - requester_id=user_id, user_id=user_id, args=args - ) - - -def serialize_for_json(obj): - """Recursively serialize objects for JSON, handling ObjectId and other special types""" - if isinstance(obj, ObjectId): - return str(obj) - elif isinstance(obj, dict): - return {k: serialize_for_json(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [serialize_for_json(item) for item in obj] - return obj - - -async def setup_chat( - request: ChatRequest, background_tasks: BackgroundTasks -) -> tuple[User, Agent, Thread, list[Tool], Optional[AblyRealtime]]: - update_channel = None - if request.update_config and request.update_config.sub_channel_name: - try: - update_channel = web_app.state.ably_client.channels.get( - str(request.update_config.sub_channel_name) - ) - except Exception as e: - logger.error(f"Failed to create Ably channel: {str(e)}") - - user = User.from_mongo(request.user_id) - agent = Agent.from_mongo(request.agent_id, cache=True) - tools = agent.get_tools(cache=True) - - if request.thread_id: - thread = Thread.from_mongo(request.thread_id) - else: - thread = agent.request_thread(user=user.id) - background_tasks.add_task(async_title_thread, thread, request.user_message) - - return user, agent, thread, tools, update_channel - - @web_app.post("/create") -async def task_admin( - request: TaskRequest, - _: dict = Depends(auth.authenticate_admin) -): - result = await handle_task(request.tool, request.user_id, request.args) - return serialize_document(result.model_dump(by_alias=True)) - +async def handle_task(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): + return await handle_task(request) -async def handle_cancel(task_id: str, user_id: str): - task = Task.from_mongo(task_id) - assert str(task.user) == user_id, "Task user does not match user_id" - if task.status in ["completed", "failed", "cancelled"]: - return {"status": task.status} - tool = Tool.load(key=task.tool) - tool.cancel(task) - return {"status": task.status} - @web_app.post("/cancel") -async def cancel( - request: CancelRequest, - _: dict = Depends(auth.authenticate_admin) -): - result = await handle_cancel(request.task_id, request.user_id) - return result - +async def cancel(request: CancelRequest, _: dict = Depends(auth.authenticate_admin)): + return await handle_cancel(request) + @web_app.post("/chat") async def handle_chat( request: ChatRequest, background_tasks: BackgroundTasks, - auth: dict = Depends(auth.authenticate_admin), + _: dict = Depends(auth.authenticate_admin), ): - try: - user, agent, thread, tools, update_channel = await setup_chat( - request, background_tasks - ) - - async def run_prompt(): - - async for update in async_prompt_thread( - user=user, - agent=agent, - thread=thread, - user_messages=request.user_message, - tools=tools, - force_reply=request.force_reply, - model="claude-3-5-sonnet-20241022", - stream=False, - ): - data = { - "type": update.type.value, - "update_config": request.update_config.model_dump() - if request.update_config - else {}, - } - - if update.type == UpdateType.ASSISTANT_MESSAGE: - data["content"] = update.message.content - elif update.type == UpdateType.TOOL_COMPLETE: - data["tool"] = update.tool_name - data["result"] = serialize_for_json(update.result) - elif update.type == UpdateType.ERROR: - data["error"] = update.error if hasattr(update, "error") else None - - if request.update_config: - if request.update_config.update_endpoint: - async with aiohttp.ClientSession() as session: - try: - async with session.post( - request.update_config.update_endpoint, - json=data, - headers={ - "Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}" - }, - ) as response: - if response.status != 200: - logger.error( - f"Failed to send update to endpoint: {await response.text()}" - ) - except Exception as e: - logger.error( - f"Error sending update to endpoint: {str(e)}" - ) - - elif update_channel: - try: - await update_channel.publish("update", data) - except Exception as e: - logger.error(f"Failed to publish to Ably: {str(e)}") - - background_tasks.add_task(run_prompt) - return {"status": "success", "thread_id": str(thread.id)} - - except Exception as e: - logger.error(f"Error in handle_chat: {str(e)}\n{traceback.format_exc()}") - return {"status": "error", "message": str(e)} + return await handle_chat(request, background_tasks) @web_app.post("/chat/stream") async def stream_chat( request: ChatRequest, - auth: dict = Depends(auth.authenticate_admin), + background_tasks: BackgroundTasks, + _: dict = Depends(auth.authenticate_admin), ): - try: - user, agent, thread, tools, update_channel = await setup_chat( - request, BackgroundTasks() - ) - - async def event_generator(): - async for update in async_prompt_thread( - user=user, - agent=agent, - thread=thread, - user_messages=request.user_message, - tools=tools, - force_reply=request.force_reply, - model="claude-3-5-sonnet-20241022", - stream=True, - ): - data = {"type": update.type} - if update.type == UpdateType.ASSISTANT_TOKEN: - data["text"] = update.text - elif update.type == UpdateType.ASSISTANT_MESSAGE: - data["content"] = update.message.content - if update.message.tool_calls: - data["tool_calls"] = [ - serialize_for_json(t.model_dump()) - for t in update.message.tool_calls - ] - elif update.type == UpdateType.TOOL_COMPLETE: - data["tool"] = update.tool_name - data["result"] = serialize_for_json(update.result) - elif update.type == UpdateType.ERROR: - data["error"] = update.error or "Unknown error occurred" - - yield f"data: {json.dumps({'event': 'update', 'data': data})}\n\n" - - yield f"data: {json.dumps({'event': 'done', 'data': ''})}\n\n" - - return StreamingResponse( - event_generator(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - }, - ) - - except Exception as e: - logger.error(f"Error in stream_chat: {str(e)}\n{traceback.format_exc()}") - return {"status": "error", "message": str(e)} + return await handle_stream_chat(request, background_tasks) @web_app.post("/deployment") async def deploy_handler( - request: DeployRequest, auth: dict = Depends(auth.authenticate_admin) + request: DeployRequest, _: dict = Depends(auth.authenticate_admin) ): - try: - if request.credentials: - create_modal_secrets( - request.credentials, - f"{request.agent_key}-secrets", - ) - - if request.command == DeployCommand.DEPLOY: - deploy_client(request.agent_key, request.platform.value) - return { - "status": "success", - "message": f"Deployed {request.platform.value} client", - } - elif request.command == DeployCommand.STOP: - stop_client(request.agent_key, request.platform.value, db) - return { - "status": "success", - "message": f"Stopped {request.platform.value} client", - } - else: - raise Exception("Invalid command") - - except Exception as e: - return {"status": "error", "message": str(e)} + return await handle_deploy(request) # Modal app setup diff --git a/eve/api/__init__.py b/eve/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eve/api/handlers.py b/eve/api/handlers.py new file mode 100644 index 0000000..2fc79a2 --- /dev/null +++ b/eve/api/handlers.py @@ -0,0 +1,181 @@ +import json +import logging +import os +import traceback +import aiohttp +from fastapi import BackgroundTasks +from fastapi.responses import StreamingResponse +from eve.api.requests import CancelRequest, ChatRequest, TaskRequest +from eve.api.utils import serialize_for_json, setup_chat +from eve.deploy import ( + DeployCommand, + DeployRequest, + create_modal_secrets, + deploy_client, + stop_client, +) +from eve.llm import UpdateType, async_prompt_thread +from eve.mongo import serialize_document +from eve.task import Task +from eve.tool import Tool + +logger = logging.getLogger(__name__) + + +async def handle_task(request: TaskRequest): + result = await handle_task(request.tool, request.user_id, request.args) + return serialize_document(result.model_dump(by_alias=True)) + + +async def handle_cancel(request: CancelRequest): + task = Task.from_mongo(request.task_id) + assert str(task.user) == request.user_id, "Task user does not match user_id" + if task.status in ["completed", "failed", "cancelled"]: + return {"status": task.status} + tool = Tool.load(key=task.tool) + tool.cancel(task) + return {"status": task.status} + + +async def handle_chat(request: ChatRequest, background_tasks: BackgroundTasks): + try: + user, agent, thread, tools, update_channel = await setup_chat( + request, background_tasks + ) + + async def run_prompt(): + async for update in async_prompt_thread( + user=user, + agent=agent, + thread=thread, + user_messages=request.user_message, + tools=tools, + force_reply=request.force_reply, + model="claude-3-5-sonnet-20241022", + stream=False, + ): + data = { + "type": update.type.value, + "update_config": request.update_config.model_dump() + if request.update_config + else {}, + } + + if update.type == UpdateType.ASSISTANT_MESSAGE: + data["content"] = update.message.content + elif update.type == UpdateType.TOOL_COMPLETE: + data["tool"] = update.tool_name + data["result"] = serialize_for_json(update.result) + elif update.type == UpdateType.ERROR: + data["error"] = update.error if hasattr(update, "error") else None + + if request.update_config: + if request.update_config.update_endpoint: + async with aiohttp.ClientSession() as session: + try: + async with session.post( + request.update_config.update_endpoint, + json=data, + headers={ + "Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}" + }, + ) as response: + if response.status != 200: + logger.error( + f"Failed to send update to endpoint: {await response.text()}" + ) + except Exception as e: + logger.error( + f"Error sending update to endpoint: {str(e)}" + ) + + elif update_channel: + try: + await update_channel.publish("update", data) + except Exception as e: + logger.error(f"Failed to publish to Ably: {str(e)}") + + background_tasks.add_task(run_prompt) + return {"status": "success", "thread_id": str(thread.id)} + + except Exception as e: + logger.error(f"Error in handle_chat: {str(e)}\n{traceback.format_exc()}") + return {"status": "error", "message": str(e)} + + +async def handle_stream_chat(request: ChatRequest, background_tasks: BackgroundTasks): + try: + user, agent, thread, tools, update_channel = await setup_chat( + request, background_tasks + ) + + async def event_generator(): + async for update in async_prompt_thread( + user=user, + agent=agent, + thread=thread, + user_messages=request.user_message, + tools=tools, + force_reply=request.force_reply, + model="claude-3-5-sonnet-20241022", + stream=True, + ): + data = {"type": update.type} + if update.type == UpdateType.ASSISTANT_TOKEN: + data["text"] = update.text + elif update.type == UpdateType.ASSISTANT_MESSAGE: + data["content"] = update.message.content + if update.message.tool_calls: + data["tool_calls"] = [ + serialize_for_json(t.model_dump()) + for t in update.message.tool_calls + ] + elif update.type == UpdateType.TOOL_COMPLETE: + data["tool"] = update.tool_name + data["result"] = serialize_for_json(update.result) + elif update.type == UpdateType.ERROR: + data["error"] = update.error or "Unknown error occurred" + + yield f"data: {json.dumps({'event': 'update', 'data': data})}\n\n" + + yield f"data: {json.dumps({'event': 'done', 'data': ''})}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + except Exception as e: + logger.error(f"Error in stream_chat: {str(e)}\n{traceback.format_exc()}") + return {"status": "error", "message": str(e)} + + +async def handle_deploy(request: DeployRequest): + try: + if request.credentials: + create_modal_secrets( + request.credentials, + f"{request.agent_key}-secrets", + ) + + if request.command == DeployCommand.DEPLOY: + deploy_client(request.agent_key, request.platform.value) + return { + "status": "success", + "message": f"Deployed {request.platform.value} client", + } + elif request.command == DeployCommand.STOP: + stop_client(request.agent_key, request.platform.value) + return { + "status": "success", + "message": f"Stopped {request.platform.value} client", + } + else: + raise Exception("Invalid command") + + except Exception as e: + return {"status": "error", "message": str(e)} diff --git a/eve/api/requests.py b/eve/api/requests.py new file mode 100644 index 0000000..a9047de --- /dev/null +++ b/eve/api/requests.py @@ -0,0 +1,35 @@ +from typing import Optional +from pydantic import BaseModel, ConfigDict + +from eve.thread import UserMessage + + +class TaskRequest(BaseModel): + tool: str + args: dict + user_id: str + + +class CancelRequest(BaseModel): + task_id: str + user_id: str + + +class UpdateConfig(BaseModel): + sub_channel_name: Optional[str] = None + update_endpoint: Optional[str] = None + discord_channel_id: Optional[str] = None + telegram_chat_id: Optional[str] = None + cast_hash: Optional[str] = None + author_fid: Optional[int] = None + message_id: Optional[str] = None + model_config = ConfigDict(arbitrary_types_allowed=True) + + +class ChatRequest(BaseModel): + user_id: str + agent_id: str + user_message: UserMessage + thread_id: Optional[str] = None + update_config: Optional[UpdateConfig] = None + force_reply: bool = False diff --git a/eve/api/utils.py b/eve/api/utils.py new file mode 100644 index 0000000..37edad0 --- /dev/null +++ b/eve/api/utils.py @@ -0,0 +1,54 @@ +import logging +from typing import Optional +from bson import ObjectId +from fastapi import BackgroundTasks, FastAPI +from ably import AblyRealtime + +from eve.tool import Tool +from eve.user import User +from eve.agent import Agent +from eve.thread import Thread +from eve.llm import async_title_thread +from eve.api.requests import ChatRequest + +logger = logging.getLogger(__name__) + + +async def setup_chat( + web_app: FastAPI, request: ChatRequest, background_tasks: BackgroundTasks +) -> tuple[User, Agent, Thread, list[Tool], Optional[AblyRealtime]]: + update_channel = None + if request.update_config and request.update_config.sub_channel_name: + try: + update_channel = web_app.state.ably_client.channels.get( + str(request.update_config.sub_channel_name) + ) + except Exception as e: + logger.error(f"Failed to create Ably channel: {str(e)}") + + user = User.from_mongo(request.user_id) + agent = Agent.from_mongo(request.agent_id, cache=True) + tools = agent.get_tools(cache=True) + + if request.thread_id: + thread = Thread.from_mongo(request.thread_id) + else: + thread = agent.request_thread(user=user.id) + background_tasks.add_task(async_title_thread, thread, request.user_message) + + return user, agent, thread, tools, update_channel + +def serialize_for_json(obj): + """Recursively serialize objects for JSON, handling ObjectId and other special types""" + if isinstance(obj, ObjectId): + return str(obj) + elif isinstance(obj, dict): + return {k: serialize_for_json(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [serialize_for_json(item) for item in obj] + return obj + + +async def handle_task(tool: str, user_id: str, args: dict = {}) -> dict: + tool = Tool.load(key=tool) + return await tool.async_start_task(requester_id=user_id, user_id=user_id, args=args) diff --git a/eve/deploy.py b/eve/deploy.py index 1fef10a..19465c1 100644 --- a/eve/deploy.py +++ b/eve/deploy.py @@ -10,6 +10,7 @@ REPO_URL = "https://github.com/edenartlab/eve.git" REPO_BRANCH = "main" DEPLOYMENT_ENV_NAME = "deployments" +db = os.getenv("DB") class DeployCommand(str, Enum): @@ -123,7 +124,7 @@ def deploy_client(agent_key: str, client_name: str): raise Exception(f"Client modal file not found: {client_path}") -def stop_client(agent_key: str, client_name: str, db: str): +def stop_client(agent_key: str, client_name: str): subprocess.run( [ "modal", From 074a70d8a1ff3e005d6f7d8e3a8933813e0eaaa8 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 09:20:43 -0500 Subject: [PATCH 02/10] refactor API code --- eve/api.py | 18 +++++++-- eve/api/handlers.py | 70 ++++++++++++++++---------------- eve/api/helpers.py | 98 +++++++++++++++++++++++++++++++++++++++++++++ eve/api/requests.py | 6 +++ eve/api/utils.py | 54 ------------------------- eve/cron.py | 12 ++++++ 6 files changed, 165 insertions(+), 93 deletions(-) create mode 100644 eve/api/helpers.py delete mode 100644 eve/api/utils.py create mode 100644 eve/cron.py diff --git a/eve/api.py b/eve/api.py index bff7f7e..3174d29 100644 --- a/eve/api.py +++ b/eve/api.py @@ -8,8 +8,13 @@ from pathlib import Path from eve import auth -from eve.api.handlers import handle_cancel, handle_deploy, handle_stream_chat -from eve.api.requests import CancelRequest, ChatRequest, TaskRequest +from eve.api.handlers import ( + handle_cancel, + handle_deploy, + handle_schedule, + handle_stream_chat, +) +from eve.api.requests import CancelRequest, ChatRequest, ScheduleRequest, TaskRequest from eve.deploy import ( DeployRequest, authenticate_modal_key, @@ -64,7 +69,7 @@ async def handle_chat( background_tasks: BackgroundTasks, _: dict = Depends(auth.authenticate_admin), ): - return await handle_chat(request, background_tasks) + return await handle_chat(request, background_tasks, web_app.state.ably_client) @web_app.post("/chat/stream") @@ -83,6 +88,13 @@ async def deploy_handler( return await handle_deploy(request) +@web_app.post("/schedule") +async def schedule_handler( + request: ScheduleRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_schedule(request) + + # Modal app setup app = modal.App( name=app_name, diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 2fc79a2..8e9bd58 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -2,11 +2,17 @@ import logging import os import traceback -import aiohttp from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse -from eve.api.requests import CancelRequest, ChatRequest, TaskRequest -from eve.api.utils import serialize_for_json, setup_chat +from ably import AblyRealtime + +from eve.api.requests import CancelRequest, ChatRequest, ScheduleRequest, TaskRequest +from eve.api.helpers import ( + emit_update, + get_update_channel, + serialize_for_json, + setup_chat, +) from eve.deploy import ( DeployCommand, DeployRequest, @@ -20,10 +26,14 @@ from eve.tool import Tool logger = logging.getLogger(__name__) +db = os.getenv("DB", "STAGE").upper() async def handle_task(request: TaskRequest): - result = await handle_task(request.tool, request.user_id, request.args) + tool = Tool.load(key=request.tool) + result = await tool.async_start_task( + requester_id=request.user_id, user_id=request.user_id, args=request.args + ) return serialize_document(result.model_dump(by_alias=True)) @@ -37,10 +47,15 @@ async def handle_cancel(request: CancelRequest): return {"status": task.status} -async def handle_chat(request: ChatRequest, background_tasks: BackgroundTasks): +async def handle_chat( + request: ChatRequest, background_tasks: BackgroundTasks, ably_client: AblyRealtime +): try: - user, agent, thread, tools, update_channel = await setup_chat( - request, background_tasks + user, agent, thread, tools = await setup_chat(request, background_tasks) + update_channel = ( + await get_update_channel(request.update_config, ably_client) + if request.update_config and request.update_config.sub_channel_name + else None ) async def run_prompt(): @@ -69,31 +84,7 @@ async def run_prompt(): elif update.type == UpdateType.ERROR: data["error"] = update.error if hasattr(update, "error") else None - if request.update_config: - if request.update_config.update_endpoint: - async with aiohttp.ClientSession() as session: - try: - async with session.post( - request.update_config.update_endpoint, - json=data, - headers={ - "Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}" - }, - ) as response: - if response.status != 200: - logger.error( - f"Failed to send update to endpoint: {await response.text()}" - ) - except Exception as e: - logger.error( - f"Error sending update to endpoint: {str(e)}" - ) - - elif update_channel: - try: - await update_channel.publish("update", data) - except Exception as e: - logger.error(f"Failed to publish to Ably: {str(e)}") + await emit_update(request.update_config, update_channel, data) background_tasks.add_task(run_prompt) return {"status": "success", "thread_id": str(thread.id)} @@ -105,9 +96,7 @@ async def run_prompt(): async def handle_stream_chat(request: ChatRequest, background_tasks: BackgroundTasks): try: - user, agent, thread, tools, update_channel = await setup_chat( - request, background_tasks - ) + user, agent, thread, tools = await setup_chat(request, background_tasks) async def event_generator(): async for update in async_prompt_thread( @@ -159,7 +148,7 @@ async def handle_deploy(request: DeployRequest): if request.credentials: create_modal_secrets( request.credentials, - f"{request.agent_key}-secrets", + f"{request.agent_key}-secrets-{db}", ) if request.command == DeployCommand.DEPLOY: @@ -179,3 +168,12 @@ async def handle_deploy(request: DeployRequest): except Exception as e: return {"status": "error", "message": str(e)} + + +async def handle_schedule(request: ScheduleRequest): + # TODO: Gene, translate natural language instruction into modal-compatible cron? Are we doing this? + + # Schedule the modal cron + + # If successful, save cron to db + pass diff --git a/eve/api/helpers.py b/eve/api/helpers.py new file mode 100644 index 0000000..8efb4d4 --- /dev/null +++ b/eve/api/helpers.py @@ -0,0 +1,98 @@ +import asyncio +import logging +import os +from typing import Optional +import aiohttp +from bson import ObjectId +from fastapi import BackgroundTasks +from ably import AblyRealtime + +from eve.tool import Tool +from eve.user import User +from eve.agent import Agent +from eve.thread import Thread +from eve.llm import async_title_thread +from eve.api.requests import ChatRequest, UpdateConfig + +logger = logging.getLogger(__name__) + + +async def get_update_channel( + update_config: UpdateConfig, ably_client: AblyRealtime +) -> Optional[AblyRealtime]: + return ably_client.channels.get(str(update_config.sub_channel_name)) + + +async def setup_chat( + request: ChatRequest, background_tasks: BackgroundTasks +) -> tuple[User, Agent, Thread, list[Tool], Optional[AblyRealtime]]: + user_task = User.from_mongo(request.user_id) + agent_task = Agent.from_mongo(request.agent_id, cache=True) + + agent = await agent_task + tools_task = agent.get_tools(cache=True) + + thread_task = None + if request.thread_id: + thread_task = Thread.from_mongo(request.thread_id) + else: + user = await user_task + thread = agent.request_thread(user=user.id) + background_tasks.add_task(async_title_thread, thread, request.user_message) + thread_task = thread + + # Wait for all tasks to complete + user, thread, tools = await asyncio.gather( + user_task, + thread_task, + tools_task, + ) + + return user, agent, thread, tools + + +def serialize_for_json(obj): + """Recursively serialize objects for JSON, handling ObjectId and other special types""" + if isinstance(obj, ObjectId): + return str(obj) + elif isinstance(obj, dict): + return {k: serialize_for_json(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [serialize_for_json(item) for item in obj] + return obj + + +async def emit_update( + update_config: UpdateConfig, update_channel: AblyRealtime, data: dict +): + if update_config and update_config.update_endpoint: + raise ValueError("update_endpoint and sub_channel_name cannot be used together") + elif update_config.update_endpoint: + await emit_http_update(update_config, data) + elif update_config.sub_channel_name: + await emit_channel_update(update_channel, data) + else: + raise ValueError("One of update_endpoint or sub_channel_name must be provided") + + +async def emit_http_update(update_config: UpdateConfig, data: dict): + async with aiohttp.ClientSession() as session: + try: + async with session.post( + update_config.update_endpoint, + json=data, + headers={"Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}"}, + ) as response: + if response.status != 200: + logger.error( + f"Failed to send update to endpoint: {await response.text()}" + ) + except Exception as e: + logger.error(f"Error sending update to endpoint: {str(e)}") + + +async def emit_channel_update(update_channel: AblyRealtime, data: dict): + try: + await update_channel.publish("update", data) + except Exception as e: + logger.error(f"Failed to publish to Ably: {str(e)}") diff --git a/eve/api/requests.py b/eve/api/requests.py index a9047de..9cf0b41 100644 --- a/eve/api/requests.py +++ b/eve/api/requests.py @@ -33,3 +33,9 @@ class ChatRequest(BaseModel): thread_id: Optional[str] = None update_config: Optional[UpdateConfig] = None force_reply: bool = False + + +class ScheduleRequest(BaseModel): + agent_id: str + user_id: str + instruction: str diff --git a/eve/api/utils.py b/eve/api/utils.py deleted file mode 100644 index 37edad0..0000000 --- a/eve/api/utils.py +++ /dev/null @@ -1,54 +0,0 @@ -import logging -from typing import Optional -from bson import ObjectId -from fastapi import BackgroundTasks, FastAPI -from ably import AblyRealtime - -from eve.tool import Tool -from eve.user import User -from eve.agent import Agent -from eve.thread import Thread -from eve.llm import async_title_thread -from eve.api.requests import ChatRequest - -logger = logging.getLogger(__name__) - - -async def setup_chat( - web_app: FastAPI, request: ChatRequest, background_tasks: BackgroundTasks -) -> tuple[User, Agent, Thread, list[Tool], Optional[AblyRealtime]]: - update_channel = None - if request.update_config and request.update_config.sub_channel_name: - try: - update_channel = web_app.state.ably_client.channels.get( - str(request.update_config.sub_channel_name) - ) - except Exception as e: - logger.error(f"Failed to create Ably channel: {str(e)}") - - user = User.from_mongo(request.user_id) - agent = Agent.from_mongo(request.agent_id, cache=True) - tools = agent.get_tools(cache=True) - - if request.thread_id: - thread = Thread.from_mongo(request.thread_id) - else: - thread = agent.request_thread(user=user.id) - background_tasks.add_task(async_title_thread, thread, request.user_message) - - return user, agent, thread, tools, update_channel - -def serialize_for_json(obj): - """Recursively serialize objects for JSON, handling ObjectId and other special types""" - if isinstance(obj, ObjectId): - return str(obj) - elif isinstance(obj, dict): - return {k: serialize_for_json(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [serialize_for_json(item) for item in obj] - return obj - - -async def handle_task(tool: str, user_id: str, args: dict = {}) -> dict: - tool = Tool.load(key=tool) - return await tool.async_start_task(requester_id=user_id, user_id=user_id, args=args) diff --git a/eve/cron.py b/eve/cron.py new file mode 100644 index 0000000..80fb582 --- /dev/null +++ b/eve/cron.py @@ -0,0 +1,12 @@ +import modal + +app = modal.App() + +@app.function(schedule=modal.Period(days=1)) +def single_tool_cron(): + pass + + +@app.function(schedule=modal.Period(days=1)) +def multiple_tool_cron(): + pass From 4b6af32c4dd356d90c34727aa2628f3158a6e82f Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 09:31:17 -0500 Subject: [PATCH 03/10] refactor client file prep to use the same method --- eve/cli/deploy_cli.py | 61 +++++++++++++------------------------------ eve/deploy.py | 33 ++++++++++------------- 2 files changed, 32 insertions(+), 62 deletions(-) diff --git a/eve/cli/deploy_cli.py b/eve/cli/deploy_cli.py index bbf5ec7..78d94cb 100644 --- a/eve/cli/deploy_cli.py +++ b/eve/cli/deploy_cli.py @@ -1,4 +1,3 @@ -import os import sys import yaml import click @@ -6,13 +5,13 @@ import subprocess from pathlib import Path from dotenv import dotenv_values -import tempfile import shutil +from eve.deploy import DEPLOYMENT_ENV_NAME, prepare_client_file + from .. import load_env root_dir = Path(__file__).parent.parent.parent -ENV_NAME = "deployments" def ensure_modal_env_exists(): @@ -25,45 +24,23 @@ def ensure_modal_env_exists(): ) # Check if our environment exists - if ENV_NAME not in result.stdout: - click.echo(click.style(f"Creating Modal environment: {ENV_NAME}", fg="green")) - subprocess.run(["rye", "run", "modal", "environment", "create", ENV_NAME]) + if DEPLOYMENT_ENV_NAME not in result.stdout: + click.echo( + click.style( + f"Creating Modal environment: {DEPLOYMENT_ENV_NAME}", fg="green" + ) + ) + subprocess.run( + ["rye", "run", "modal", "environment", "create", DEPLOYMENT_ENV_NAME] + ) else: click.echo( - click.style(f"Using existing Modal environment: {ENV_NAME}", fg="blue") + click.style( + f"Using existing Modal environment: {DEPLOYMENT_ENV_NAME}", fg="blue" + ) ) -def prepare_client_file(file_path: str, agent_key: str, env: str) -> str: - """Create a temporary copy of the client file with modifications""" - with open(file_path, "r") as f: - content = f.read() - - # Get the repo root directory - repo_root = root_dir.absolute() - pyproject_path = repo_root / "pyproject.toml" - - # Replace the static secret name with the dynamic one - modified_content = content.replace( - 'modal.Secret.from_name("client-secrets")', - f'modal.Secret.from_name("{agent_key}-secrets-{env}")', - ) - - # Fix pyproject.toml path to use absolute path - modified_content = modified_content.replace( - '.pip_install_from_pyproject("pyproject.toml")', - f'.pip_install_from_pyproject("{pyproject_path}")', - ) - - # Create a temporary file with the modified content - temp_dir = tempfile.mkdtemp() - temp_file = Path(temp_dir) / "modal_client.py" - with open(temp_file, "w") as f: - f.write(modified_content) - - return str(temp_file) - - def create_secrets(agent_key: str, secrets_dict: dict, env: str): if not secrets_dict: click.echo(click.style(f"No secrets found for {agent_key}", fg="yellow")) @@ -81,7 +58,7 @@ def create_secrets(agent_key: str, secrets_dict: dict, env: str): if value is not None: value = str(value).strip().strip("'\"") cmd_parts.append(f"{key}={value}") - cmd_parts.extend(["-e", ENV_NAME, "--force"]) + cmd_parts.extend(["-e", DEPLOYMENT_ENV_NAME, "--force"]) subprocess.run(cmd_parts) @@ -105,7 +82,7 @@ def deploy_client(agent_key: str, client_name: str, env: str): app_name, temp_file, "-e", - ENV_NAME, + DEPLOYMENT_ENV_NAME, ] ) finally: @@ -211,10 +188,8 @@ def deploy(agent: str, all: bool, db: str): for agent_name in agents: click.echo(click.style(f"\nProcessing agent: {agent_name}", fg="blue")) - agent_path = ( - root_dir / "eve" / "agents" / env / agent_name / "api.yaml" - ) - process_agent(agent_path, env) + agent_path = root_dir / "eve" / "agents" / env / agent_name / "api.yaml" + process_agent(agent_path) else: if not agent: diff --git a/eve/deploy.py b/eve/deploy.py index 19465c1..493622b 100644 --- a/eve/deploy.py +++ b/eve/deploy.py @@ -1,16 +1,16 @@ import os +from pathlib import Path import subprocess import tempfile from enum import Enum -from pydantic import BaseModel -from typing import Optional, Dict +from typing import Dict -from eve.models import ClientType REPO_URL = "https://github.com/edenartlab/eve.git" REPO_BRANCH = "main" DEPLOYMENT_ENV_NAME = "deployments" -db = os.getenv("DB") +db = os.getenv("DB", "STAGE").upper() +env = "prod" if db == "PROD" else "stage" class DeployCommand(str, Enum): @@ -18,13 +18,6 @@ class DeployCommand(str, Enum): STOP = "stop" -class DeployRequest(BaseModel): - agent_key: str - platform: ClientType - command: DeployCommand - credentials: Optional[Dict[str, str]] = None - - def authenticate_modal_key() -> bool: token_id = os.getenv("MODAL_DEPLOYER_TOKEN_ID") token_secret = os.getenv("MODAL_DEPLOYER_TOKEN_SECRET") @@ -78,21 +71,19 @@ def clone_repo(temp_dir: str): ) -def modify_client_file(file_path: str, agent_key: str) -> None: +def prepare_client_file(file_path: str, agent_key: str) -> None: """Modify the client file to use correct secret name and fix pyproject path""" with open(file_path, "r") as f: content = f.read() # Get the repo root directory (three levels up from the client file) - repo_root = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(file_path))) - ) - pyproject_path = os.path.join(repo_root, "pyproject.toml") + repo_root = Path(__file__).parent.parent.parent + pyproject_path = repo_root / "pyproject.toml" # Replace the static secret name with the dynamic one modified_content = content.replace( 'modal.Secret.from_name("client-secrets")', - f'modal.Secret.from_name("{agent_key}-secrets")', + f'modal.Secret.from_name("{agent_key}-secrets-{db}")', ) # Fix pyproject.toml path to use absolute path @@ -101,9 +92,13 @@ def modify_client_file(file_path: str, agent_key: str) -> None: f'.pip_install_from_pyproject("{pyproject_path}")', ) - with open(file_path, "w") as f: + temp_dir = tempfile.mkdtemp() + temp_file = Path(temp_dir) / "modal_client.py" + with open(temp_file, "w") as f: f.write(modified_content) + return str(temp_file) + def deploy_client(agent_key: str, client_name: str): with tempfile.TemporaryDirectory() as temp_dir: @@ -116,7 +111,7 @@ def deploy_client(agent_key: str, client_name: str): ) if os.path.exists(client_path): # Modify the client file to use the correct secret name - modify_client_file(client_path, agent_key) + prepare_client_file(client_path, agent_key) subprocess.run( ["modal", "deploy", client_path, "-e", DEPLOYMENT_ENV_NAME], check=True ) From 9d40a5082a5a1cc0909006d51d69f179833a165e Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 12:05:43 -0500 Subject: [PATCH 04/10] more refactor --- .github/workflows/prod.yaml | 2 +- .github/workflows/staging.yaml | 2 +- eve/__init__.py | 2 +- eve/api.py | 133 +---------------------------- eve/api/api.py | 148 +++++++++++++++++++++++++++++++++ eve/api/handlers.py | 15 ++-- eve/api/requests.py | 11 ++- eve/cli/start_cli.py | 13 +-- eve/cron.py | 46 ++++++++-- pyproject.toml | 1 + requirements-dev.lock | 4 + requirements.lock | 4 + 12 files changed, 228 insertions(+), 153 deletions(-) create mode 100644 eve/api/api.py diff --git a/.github/workflows/prod.yaml b/.github/workflows/prod.yaml index c2c2a41..358f5af 100644 --- a/.github/workflows/prod.yaml +++ b/.github/workflows/prod.yaml @@ -46,4 +46,4 @@ jobs: - name: Deploy to Modal working-directory: eve - run: rye run modal deploy ./eve/api.py + run: rye run modal deploy ./eve/api/api.py diff --git a/.github/workflows/staging.yaml b/.github/workflows/staging.yaml index 71eccf9..5d18264 100644 --- a/.github/workflows/staging.yaml +++ b/.github/workflows/staging.yaml @@ -46,4 +46,4 @@ jobs: - name: Deploy to Modal working-directory: eve - run: rye run modal deploy ./eve/api.py + run: rye run modal deploy ./eve/api/api.py diff --git a/eve/__init__.py b/eve/__init__.py index f3f7feb..440a90c 100644 --- a/eve/__init__.py +++ b/eve/__init__.py @@ -16,7 +16,7 @@ def load_env(db): db = db.upper() if db not in ["STAGE", "PROD"]: raise ValueError(f"Invalid database: {db}") - + os.environ["DB"] = db # First try ~/.eve diff --git a/eve/api.py b/eve/api.py index 3174d29..0519ecb 100644 --- a/eve/api.py +++ b/eve/api.py @@ -1,132 +1 @@ -import os -import modal -from fastapi import FastAPI, Depends, BackgroundTasks -from fastapi.middleware.cors import CORSMiddleware -from fastapi.security import APIKeyHeader, HTTPBearer -import logging -from ably import AblyRealtime -from pathlib import Path - -from eve import auth -from eve.api.handlers import ( - handle_cancel, - handle_deploy, - handle_schedule, - handle_stream_chat, -) -from eve.api.requests import CancelRequest, ChatRequest, ScheduleRequest, TaskRequest -from eve.deploy import ( - DeployRequest, - authenticate_modal_key, - check_environment_exists, - create_environment, -) -from eve import deploy - -# Config and logging setup -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -db = os.getenv("DB", "STAGE").upper() -if db not in ["PROD", "STAGE"]: - raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") -app_name = "api-prod" if db == "PROD" else "api-stage" - -# FastAPI setup -web_app = FastAPI() -web_app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) -bearer_scheme = HTTPBearer(auto_error=False) -background_tasks: BackgroundTasks = BackgroundTasks() - - -# Store ably client at app state level -@web_app.on_event("startup") -async def startup_event(): - web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) - - -@web_app.post("/create") -async def handle_task(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): - return await handle_task(request) - - -@web_app.post("/cancel") -async def cancel(request: CancelRequest, _: dict = Depends(auth.authenticate_admin)): - return await handle_cancel(request) - - -@web_app.post("/chat") -async def handle_chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), -): - return await handle_chat(request, background_tasks, web_app.state.ably_client) - - -@web_app.post("/chat/stream") -async def stream_chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), -): - return await handle_stream_chat(request, background_tasks) - - -@web_app.post("/deployment") -async def deploy_handler( - request: DeployRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_deploy(request) - - -@web_app.post("/schedule") -async def schedule_handler( - request: ScheduleRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_schedule(request) - - -# Modal app setup -app = modal.App( - name=app_name, - secrets=[ - modal.Secret.from_name("eve-secrets"), - modal.Secret.from_name(f"eve-secrets-{db}"), - ], -) - -root_dir = Path(__file__).parent.parent -workflows_dir = root_dir / ".." / "workflows" - -image = ( - modal.Image.debian_slim(python_version="3.11") - .env({"DB": db, "MODAL_SERVE": os.getenv("MODAL_SERVE")}) - .apt_install("git", "libmagic1", "ffmpeg", "wget") - .pip_install_from_pyproject(str(root_dir / "pyproject.toml")) - .run_commands(["playwright install"]) - .copy_local_dir(str(workflows_dir), "/workflows") -) - - -@app.function( - image=image, - keep_warm=1, - concurrency_limit=10, - container_idle_timeout=60, - timeout=3600, -) -@modal.asgi_app() -def fastapi_app(): - authenticate_modal_key() - if not check_environment_exists(deploy.DEPLOYMENT_ENV_NAME): - create_environment(deploy.DEPLOYMENT_ENV_NAME) - return web_app + \ No newline at end of file diff --git a/eve/api/api.py b/eve/api/api.py new file mode 100644 index 0000000..a11ad58 --- /dev/null +++ b/eve/api/api.py @@ -0,0 +1,148 @@ +from contextlib import asynccontextmanager +import os +import modal +from fastapi import FastAPI, Depends, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import APIKeyHeader, HTTPBearer +import logging +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +from pathlib import Path + +from eve import auth +from eve.api.handlers import ( + handle_cancel, + handle_chat, + handle_create, + handle_deployment, + handle_schedule, + handle_stream_chat, +) +from eve.api.requests import ( + CancelRequest, + ChatRequest, + ScheduleRequest, + TaskRequest, + DeployRequest, +) +from eve.deploy import ( + authenticate_modal_key, + check_environment_exists, + create_environment, +) +from eve import deploy + +# Config and logging setup +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +db = os.getenv("DB", "STAGE").upper() +if db not in ["PROD", "STAGE"]: + raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") +app_name = "api-prod" if db == "PROD" else "api-stage" + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Setup + app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) + yield + # Cleanup + if hasattr(app.state, "ably_client"): + app.state.ably_client.close() + + +# FastAPI setup +web_app = FastAPI(lifespan=lifespan) +web_app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +scheduler = BackgroundScheduler() +scheduler.start() + +api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) +bearer_scheme = HTTPBearer(auto_error=False) +background_tasks: BackgroundTasks = BackgroundTasks() + + +@web_app.post("/create") +async def create(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): + return await handle_create(request) + + +@web_app.post("/cancel") +async def cancel(request: CancelRequest, _: dict = Depends(auth.authenticate_admin)): + return await handle_cancel(request) + + +@web_app.post("/chat") +async def chat( + request: ChatRequest, + background_tasks: BackgroundTasks, + _: dict = Depends(auth.authenticate_admin), +): + return await handle_chat(request, background_tasks, web_app.state.ably_client) + + +@web_app.post("/chat/stream") +async def stream_chat( + request: ChatRequest, + background_tasks: BackgroundTasks, + _: dict = Depends(auth.authenticate_admin), +): + return await handle_stream_chat(request, background_tasks) + + +@web_app.post("/deployment") +async def deployment( + request: DeployRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_deployment(request) + + +@web_app.post("/schedule") +async def schedule( + request: ScheduleRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_schedule(request) + + +# Modal app setup +app = modal.App( + name=app_name, + secrets=[ + modal.Secret.from_name("eve-secrets"), + modal.Secret.from_name(f"eve-secrets-{db}"), + ], +) + +root_dir = Path(__file__).parent.parent +workflows_dir = root_dir / ".." / "workflows" + +image = ( + modal.Image.debian_slim(python_version="3.11") + .env({"DB": db, "MODAL_SERVE": os.getenv("MODAL_SERVE")}) + .apt_install("git", "libmagic1", "ffmpeg", "wget") + .pip_install_from_pyproject(str(root_dir / "pyproject.toml")) + .run_commands(["playwright install"]) + .copy_local_dir(str(workflows_dir), "/workflows") +) + + +@app.function( + image=image, + keep_warm=1, + concurrency_limit=10, + container_idle_timeout=60, + timeout=3600, +) +@modal.asgi_app() +def fastapi_app(): + authenticate_modal_key() + if not check_environment_exists(deploy.DEPLOYMENT_ENV_NAME): + create_environment(deploy.DEPLOYMENT_ENV_NAME) + return web_app diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 8e9bd58..6bfd2e2 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -6,7 +6,13 @@ from fastapi.responses import StreamingResponse from ably import AblyRealtime -from eve.api.requests import CancelRequest, ChatRequest, ScheduleRequest, TaskRequest +from eve.api.requests import ( + CancelRequest, + ChatRequest, + DeployRequest, + ScheduleRequest, + TaskRequest, +) from eve.api.helpers import ( emit_update, get_update_channel, @@ -15,7 +21,6 @@ ) from eve.deploy import ( DeployCommand, - DeployRequest, create_modal_secrets, deploy_client, stop_client, @@ -29,7 +34,7 @@ db = os.getenv("DB", "STAGE").upper() -async def handle_task(request: TaskRequest): +async def handle_create(request: TaskRequest): tool = Tool.load(key=request.tool) result = await tool.async_start_task( requester_id=request.user_id, user_id=request.user_id, args=request.args @@ -143,7 +148,7 @@ async def event_generator(): return {"status": "error", "message": str(e)} -async def handle_deploy(request: DeployRequest): +async def handle_deployment(request: DeployRequest): try: if request.credentials: create_modal_secrets( @@ -173,7 +178,7 @@ async def handle_deploy(request: DeployRequest): async def handle_schedule(request: ScheduleRequest): # TODO: Gene, translate natural language instruction into modal-compatible cron? Are we doing this? - # Schedule the modal cron + # Prepare the modal cron # If successful, save cron to db pass diff --git a/eve/api/requests.py b/eve/api/requests.py index 9cf0b41..c4a67aa 100644 --- a/eve/api/requests.py +++ b/eve/api/requests.py @@ -1,6 +1,8 @@ -from typing import Optional +from typing import Dict, Optional from pydantic import BaseModel, ConfigDict +from eve.deploy import DeployCommand +from eve.models import ClientType from eve.thread import UserMessage @@ -39,3 +41,10 @@ class ScheduleRequest(BaseModel): agent_id: str user_id: str instruction: str + + +class DeployRequest(BaseModel): + agent_key: str + platform: ClientType + command: DeployCommand + credentials: Optional[Dict[str, str]] = None diff --git a/eve/cli/start_cli.py b/eve/cli/start_cli.py index cc54811..8cae650 100644 --- a/eve/cli/start_cli.py +++ b/eve/cli/start_cli.py @@ -126,7 +126,7 @@ def start(agent: str, db: str, platforms: tuple, local: bool): @click.command() @click.option( "--host", - default="0.0.0.0", + default="127.0.0.1", help="Host to bind the server to", ) @click.option( @@ -138,7 +138,7 @@ def start(agent: str, db: str, platforms: tuple, local: bool): @click.option( "--reload", is_flag=True, - default=False, + default=True, help="Enable auto-reload on code changes", ) @click.option( @@ -150,15 +150,16 @@ def start(agent: str, db: str, platforms: tuple, local: bool): def api(host: str, port: int, reload: bool, db: str): """Start the Eve API server""" import uvicorn - import os load_env(db) - - click.echo(click.style(f"Starting API server on {host}:{port} with DB={db}...", fg="blue")) + + click.echo( + click.style(f"Starting API server on {host}:{port} with DB={db}...", fg="blue") + ) # Adjusted the import path to look one directory up uvicorn.run( - "eve.api:web_app", + "eve.api.api:app", host=host, port=port, reload=reload, diff --git a/eve/cron.py b/eve/cron.py index 80fb582..57bc818 100644 --- a/eve/cron.py +++ b/eve/cron.py @@ -1,12 +1,46 @@ +import os +import aiohttp import modal +import logging -app = modal.App() +logger = logging.getLogger(__name__) + +db = os.getenv("DB", "STAGE").upper() +if db not in ["PROD", "STAGE"]: + raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") +API_URL = os.getenv("EDEN_API_URL") -@app.function(schedule=modal.Period(days=1)) -def single_tool_cron(): - pass + +app = modal.App( + name=f"[AGENT_USERNAME]-cron-[CRON_ID]-{db}", + secrets=[ + modal.Secret.from_name("eve-secrets", environment_name="main"), + modal.Secret.from_name(f"eve-secrets-{db}", environment_name="main"), + ], +) @app.function(schedule=modal.Period(days=1)) -def multiple_tool_cron(): - pass +async def chat_cron(): + async with aiohttp.ClientSession() as session: + request_data = { + "user_id": "[USER_ID]", + "agent_id": "[AGENT_ID]", + "thread_id": "[THREAD_ID]", + "force_reply": True, + "user_message": { + "content": "[CONTENT]", + "name": "[NAME]", + "attachments": "[ATTACHMENTS]", + }, + "update_config": "[UPDATE_CONFIG]", + } + + print(f"Sending request: {request_data}") + async with session.post( + f"{API_URL}/chat", + json=request_data, + headers={"Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}"}, + ) as response: + if response.status != 200: + logger.error(f"Failed to send request: {response.status}") diff --git a/pyproject.toml b/pyproject.toml index affcebb..e7a0c53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "colorama>=0.4.6", "web3<7.6.1", "playwright<1.49", + "apscheduler>=3.11.0", ] [build-system] diff --git a/requirements-dev.lock b/requirements-dev.lock index 1301343..453fc2f 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -34,6 +34,8 @@ anyio==4.7.0 # via runwayml # via starlette # via watchfiles +apscheduler==3.11.0 + # via eve attrs==24.2.0 # via aiohttp # via jsonschema @@ -480,6 +482,8 @@ typing-extensions==4.12.2 # via web3 typing-inspect==0.9.0 # via clerk-backend-api +tzlocal==5.2 + # via apscheduler urllib3==2.2.3 # via botocore # via requests diff --git a/requirements.lock b/requirements.lock index 8165085..d08cb28 100644 --- a/requirements.lock +++ b/requirements.lock @@ -34,6 +34,8 @@ anyio==4.7.0 # via runwayml # via starlette # via watchfiles +apscheduler==3.11.0 + # via eve attrs==24.2.0 # via aiohttp # via jsonschema @@ -451,6 +453,8 @@ typing-extensions==4.12.2 # via web3 typing-inspect==0.9.0 # via clerk-backend-api +tzlocal==5.2 + # via apscheduler urllib3==2.2.3 # via botocore # via requests From ffc46a7d235e915ef3743d753b6a78f922b5f7c0 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 22:15:25 -0500 Subject: [PATCH 05/10] maybe it works? --- eve/api/api.py | 18 +++++++++++---- eve/api/handlers.py | 55 ++++++++++++++++++++++++++++++++++++++++----- eve/api/requests.py | 33 ++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/eve/api/api.py b/eve/api/api.py index a11ad58..c48123a 100644 --- a/eve/api/api.py +++ b/eve/api/api.py @@ -45,11 +45,16 @@ @asynccontextmanager async def lifespan(app: FastAPI): # Setup + scheduler = BackgroundScheduler() + scheduler.start() + app.state.scheduler = scheduler app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) yield # Cleanup if hasattr(app.state, "ably_client"): app.state.ably_client.close() + if hasattr(app.state, "scheduler"): + app.state.scheduler.shutdown() # FastAPI setup @@ -61,8 +66,6 @@ async def lifespan(app: FastAPI): allow_methods=["*"], allow_headers=["*"], ) -scheduler = BackgroundScheduler() -scheduler.start() api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) bearer_scheme = HTTPBearer(auto_error=False) @@ -106,9 +109,16 @@ async def deployment( @web_app.post("/schedule") async def schedule( - request: ScheduleRequest, _: dict = Depends(auth.authenticate_admin) + request: ScheduleRequest, + background_tasks: BackgroundTasks, + _: dict = Depends(auth.authenticate_admin), ): - return await handle_schedule(request) + return await handle_schedule( + request=request, + background_tasks=background_tasks, + scheduler=web_app.state.scheduler, + ably_client=web_app.state.ably_client, + ) # Modal app setup diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 6bfd2e2..8ce7469 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -5,6 +5,8 @@ from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger from eve.api.requests import ( CancelRequest, @@ -29,6 +31,7 @@ from eve.mongo import serialize_document from eve.task import Task from eve.tool import Tool +from eve.thread import UserMessage logger = logging.getLogger(__name__) db = os.getenv("DB", "STAGE").upper() @@ -175,10 +178,52 @@ async def handle_deployment(request: DeployRequest): return {"status": "error", "message": str(e)} -async def handle_schedule(request: ScheduleRequest): - # TODO: Gene, translate natural language instruction into modal-compatible cron? Are we doing this? +async def handle_schedule( + request: ScheduleRequest, + background_tasks: BackgroundTasks, + scheduler: BackgroundScheduler, + ably_client: AblyRealtime, +): + try: + # Create the cron trigger using the validated schedule + trigger = CronTrigger(**request.schedule.to_cron_dict()) + + # Create a function that will create a new thread and call handle_chat + async def run_scheduled_chat(): + # Create a new ChatRequest for each scheduled run + chat_request = ChatRequest( + user_id=request.user_id, + agent_id=request.agent_id, + user_message=UserMessage( + content=request.message_content, + ), + update_config=request.update_config, + force_reply=True, # Force reply since this is automated + ) - # Prepare the modal cron + # Call handle_chat with the new request + await handle_chat( + request=chat_request, + background_tasks=background_tasks, + ably_client=ably_client, + ) - # If successful, save cron to db - pass + # Add job to scheduler + job = scheduler.add_job( + run_scheduled_chat, + trigger=trigger, + id=f"{request.user_id}_{request.tool_id}", + # Mark as async job + misfire_grace_time=None, + coalesce=True, + ) + + return { + "status": "success", + "job_id": job.id, + "next_run_time": str(job.next_run_time), + } + + except Exception as e: + logger.error(f"Error scheduling task: {str(e)}\n{traceback.format_exc()}") + return {"status": "error", "message": str(e)} diff --git a/eve/api/requests.py b/eve/api/requests.py index c4a67aa..baf5af7 100644 --- a/eve/api/requests.py +++ b/eve/api/requests.py @@ -1,9 +1,12 @@ +from dataclasses import Field from typing import Dict, Optional from pydantic import BaseModel, ConfigDict from eve.deploy import DeployCommand from eve.models import ClientType from eve.thread import UserMessage +from datetime import datetime +from zoneinfo import ZoneInfo class TaskRequest(BaseModel): @@ -37,10 +40,38 @@ class ChatRequest(BaseModel): force_reply: bool = False +class CronSchedule(BaseModel): + year: Optional[int | str] = Field(None, description="4-digit year") + month: Optional[int | str] = Field(None, description="month (1-12)") + day: Optional[int | str] = Field(None, description="day of month (1-31)") + week: Optional[int | str] = Field(None, description="ISO week (1-53)") + day_of_week: Optional[int | str] = Field( + None, + description="number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)", + ) + hour: Optional[int | str] = Field(None, description="hour (0-23)") + minute: Optional[int | str] = Field(None, description="minute (0-59)") + second: Optional[int | str] = Field(None, description="second (0-59)") + start_date: Optional[datetime] = Field( + None, description="earliest possible date/time to trigger on (inclusive)" + ) + end_date: Optional[datetime] = Field( + None, description="latest possible date/time to trigger on (inclusive)" + ) + timezone: Optional[str] = Field( + None, description="time zone to use for the date/time calculations" + ) + + def to_cron_dict(self) -> dict: + return {k: v for k, v in self.model_dump().items() if v is not None} + + class ScheduleRequest(BaseModel): agent_id: str user_id: str - instruction: str + message_content: str + schedule: CronSchedule + update_config: Optional[UpdateConfig] = None class DeployRequest(BaseModel): From 0cd3c04adba1c1247c295eba583f84f79d62d1cb Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 22:40:13 -0500 Subject: [PATCH 06/10] it seems to work --- eve/api/api.py | 39 +++++++++++------------ eve/api/handlers.py | 73 +++++++++++++++++++++++++++++--------------- eve/api/helpers.py | 22 +++---------- eve/api/requests.py | 6 ++-- eve/cli/start_cli.py | 3 +- eve/cron.py | 1 - 6 files changed, 76 insertions(+), 68 deletions(-) diff --git a/eve/api/api.py b/eve/api/api.py index c48123a..8a3b7f0 100644 --- a/eve/api/api.py +++ b/eve/api/api.py @@ -1,4 +1,3 @@ -from contextlib import asynccontextmanager import os import modal from fastapi import FastAPI, Depends, BackgroundTasks @@ -42,23 +41,8 @@ app_name = "api-prod" if db == "PROD" else "api-stage" -@asynccontextmanager -async def lifespan(app: FastAPI): - # Setup - scheduler = BackgroundScheduler() - scheduler.start() - app.state.scheduler = scheduler - app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) - yield - # Cleanup - if hasattr(app.state, "ably_client"): - app.state.ably_client.close() - if hasattr(app.state, "scheduler"): - app.state.scheduler.shutdown() - - # FastAPI setup -web_app = FastAPI(lifespan=lifespan) +web_app = FastAPI() web_app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -67,6 +51,25 @@ async def lifespan(app: FastAPI): allow_headers=["*"], ) + +@web_app.on_event("startup") +async def startup_event(): + # Setup + scheduler = BackgroundScheduler() + scheduler.start() + web_app.state.scheduler = scheduler + web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) + + +@web_app.on_event("shutdown") +async def shutdown_event(): + # Cleanup + if hasattr(web_app.state, "ably_client"): + web_app.state.ably_client.close() + if hasattr(web_app.state, "scheduler"): + web_app.state.scheduler.shutdown() + + api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) bearer_scheme = HTTPBearer(auto_error=False) background_tasks: BackgroundTasks = BackgroundTasks() @@ -110,12 +113,10 @@ async def deployment( @web_app.post("/schedule") async def schedule( request: ScheduleRequest, - background_tasks: BackgroundTasks, _: dict = Depends(auth.authenticate_admin), ): return await handle_schedule( request=request, - background_tasks=background_tasks, scheduler=web_app.state.scheduler, ably_client=web_app.state.ably_client, ) diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 8ce7469..1d27cf5 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -1,7 +1,9 @@ import json import logging import os +import time import traceback +import asyncio from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse from ably import AblyRealtime @@ -180,40 +182,61 @@ async def handle_deployment(request: DeployRequest): async def handle_schedule( request: ScheduleRequest, - background_tasks: BackgroundTasks, scheduler: BackgroundScheduler, ably_client: AblyRealtime, ): try: - # Create the cron trigger using the validated schedule trigger = CronTrigger(**request.schedule.to_cron_dict()) - # Create a function that will create a new thread and call handle_chat - async def run_scheduled_chat(): - # Create a new ChatRequest for each scheduled run - chat_request = ChatRequest( - user_id=request.user_id, - agent_id=request.agent_id, - user_message=UserMessage( - content=request.message_content, - ), - update_config=request.update_config, - force_reply=True, # Force reply since this is automated - ) - - # Call handle_chat with the new request - await handle_chat( - request=chat_request, - background_tasks=background_tasks, - ably_client=ably_client, - ) + def run_scheduled_task(): + logger.info(f"Running scheduled chat for user {request.user_id}") + + # Create new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # Create new background tasks for this run + background_tasks = BackgroundTasks() + + # Create the chat request + chat_request = ChatRequest( + user_id=request.user_id, + agent_id=request.agent_id, + user_message=UserMessage( + content=request.message_content, + ), + update_config=request.update_config, + force_reply=True, + ) + + # Run the async chat handler and its background tasks + result = loop.run_until_complete( + handle_chat( + request=chat_request, + background_tasks=background_tasks, + ably_client=ably_client, + ) + ) + + # Execute any background tasks that were created + loop.run_until_complete(background_tasks()) + + logger.info( + f"Completed scheduled chat for user {request.user_id}: {result}" + ) + + except Exception as e: + logger.error( + f"Error in scheduled chat: {str(e)}\n{traceback.format_exc()}" + ) + finally: + loop.close() - # Add job to scheduler job = scheduler.add_job( - run_scheduled_chat, + run_scheduled_task, trigger=trigger, - id=f"{request.user_id}_{request.tool_id}", - # Mark as async job + id=f"{request.user_id}_{request.agent_id}_{time.time()}", misfire_grace_time=None, coalesce=True, ) diff --git a/eve/api/helpers.py b/eve/api/helpers.py index 8efb4d4..75cc1f1 100644 --- a/eve/api/helpers.py +++ b/eve/api/helpers.py @@ -25,28 +25,16 @@ async def get_update_channel( async def setup_chat( request: ChatRequest, background_tasks: BackgroundTasks -) -> tuple[User, Agent, Thread, list[Tool], Optional[AblyRealtime]]: - user_task = User.from_mongo(request.user_id) - agent_task = Agent.from_mongo(request.agent_id, cache=True) +) -> tuple[User, Agent, Thread, list[Tool]]: + user = User.from_mongo(request.user_id) + agent = Agent.from_mongo(request.agent_id, cache=True) + tools = agent.get_tools(cache=True) - agent = await agent_task - tools_task = agent.get_tools(cache=True) - - thread_task = None if request.thread_id: - thread_task = Thread.from_mongo(request.thread_id) + thread = Thread.from_mongo(request.thread_id) else: - user = await user_task thread = agent.request_thread(user=user.id) background_tasks.add_task(async_title_thread, thread, request.user_message) - thread_task = thread - - # Wait for all tasks to complete - user, thread, tools = await asyncio.gather( - user_task, - thread_task, - tools_task, - ) return user, agent, thread, tools diff --git a/eve/api/requests.py b/eve/api/requests.py index baf5af7..6165db4 100644 --- a/eve/api/requests.py +++ b/eve/api/requests.py @@ -1,12 +1,10 @@ -from dataclasses import Field from typing import Dict, Optional -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field +from datetime import datetime from eve.deploy import DeployCommand from eve.models import ClientType from eve.thread import UserMessage -from datetime import datetime -from zoneinfo import ZoneInfo class TaskRequest(BaseModel): diff --git a/eve/cli/start_cli.py b/eve/cli/start_cli.py index 8cae650..5371a15 100644 --- a/eve/cli/start_cli.py +++ b/eve/cli/start_cli.py @@ -157,9 +157,8 @@ def api(host: str, port: int, reload: bool, db: str): click.style(f"Starting API server on {host}:{port} with DB={db}...", fg="blue") ) - # Adjusted the import path to look one directory up uvicorn.run( - "eve.api.api:app", + "eve.api.api:web_app", host=host, port=port, reload=reload, diff --git a/eve/cron.py b/eve/cron.py index 57bc818..13cc9ba 100644 --- a/eve/cron.py +++ b/eve/cron.py @@ -10,7 +10,6 @@ raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") API_URL = os.getenv("EDEN_API_URL") - app = modal.App( name=f"[AGENT_USERNAME]-cron-[CRON_ID]-{db}", secrets=[ From 17152aa21b710fe8519193ea70234623c97c3e1f Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Mon, 6 Jan 2025 22:41:25 -0500 Subject: [PATCH 07/10] remove unnecessary cron file --- eve/cron.py | 45 --------------------------------------------- 1 file changed, 45 deletions(-) delete mode 100644 eve/cron.py diff --git a/eve/cron.py b/eve/cron.py deleted file mode 100644 index 13cc9ba..0000000 --- a/eve/cron.py +++ /dev/null @@ -1,45 +0,0 @@ -import os -import aiohttp -import modal -import logging - -logger = logging.getLogger(__name__) - -db = os.getenv("DB", "STAGE").upper() -if db not in ["PROD", "STAGE"]: - raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") -API_URL = os.getenv("EDEN_API_URL") - -app = modal.App( - name=f"[AGENT_USERNAME]-cron-[CRON_ID]-{db}", - secrets=[ - modal.Secret.from_name("eve-secrets", environment_name="main"), - modal.Secret.from_name(f"eve-secrets-{db}", environment_name="main"), - ], -) - - -@app.function(schedule=modal.Period(days=1)) -async def chat_cron(): - async with aiohttp.ClientSession() as session: - request_data = { - "user_id": "[USER_ID]", - "agent_id": "[AGENT_ID]", - "thread_id": "[THREAD_ID]", - "force_reply": True, - "user_message": { - "content": "[CONTENT]", - "name": "[NAME]", - "attachments": "[ATTACHMENTS]", - }, - "update_config": "[UPDATE_CONFIG]", - } - - print(f"Sending request: {request_data}") - async with session.post( - f"{API_URL}/chat", - json=request_data, - headers={"Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}"}, - ) as response: - if response.status != 200: - logger.error(f"Failed to send request: {response.status}") From 5d3139646b6ad8cc5d8d882f3de0d804f463ade4 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Tue, 7 Jan 2025 10:20:11 -0500 Subject: [PATCH 08/10] Add ably heartbeat --- eve/api.py | 43 ++++++++++++++++++++++++++--------- eve/api/api.py | 2 ++ eve/clients/discord/client.py | 11 ++++----- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/eve/api.py b/eve/api.py index ff40201..7566ad7 100644 --- a/eve/api.py +++ b/eve/api.py @@ -1,13 +1,14 @@ +import logging import os import threading import modal from fastapi import FastAPI, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader, HTTPBearer -import logging from ably import AblyRealtime from apscheduler.schedulers.background import BackgroundScheduler from pathlib import Path +from contextlib import asynccontextmanager from eve import auth from eve.api.handlers import ( @@ -33,17 +34,44 @@ from eve import deploy from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 -# Config and logging setup + logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logging.getLogger("ably").setLevel(logging.WARNING) + db = os.getenv("DB", "STAGE").upper() if db not in ["PROD", "STAGE"]: raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") app_name = "api-prod" if db == "PROD" else "api-stage" + # FastAPI setup -web_app = FastAPI() +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) + watch_thread.start() + app.state.watch_thread = watch_thread + + app.state.ably_client = AblyRealtime( + os.getenv("ABLY_PUBLISHER_KEY"), + options={ + "heartbeat_interval": 15000, + "connection_state_ttl": 60000, + "disconnected_retry_timeout": 15000, + }, + ) + yield + # Shutdown + if hasattr(app.state, "watch_thread"): + app.state.watch_thread.join(timeout=5) + if hasattr(app.state, "scheduler"): + app.state.scheduler.shutdown(wait=True) + if hasattr(app.state, "ably_client"): + await app.state.ably_client.close() + + +web_app = FastAPI(lifespan=lifespan) web_app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -59,13 +87,6 @@ background_tasks: BackgroundTasks = BackgroundTasks() -@web_app.on_event("startup") -async def startup_event(): - watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) - watch_thread.start() - web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) - - @web_app.post("/create") async def create(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): return await handle_create(request) diff --git a/eve/api/api.py b/eve/api/api.py index 8a3b7f0..3aa87f7 100644 --- a/eve/api/api.py +++ b/eve/api/api.py @@ -113,10 +113,12 @@ async def deployment( @web_app.post("/schedule") async def schedule( request: ScheduleRequest, + background_tasks: BackgroundTasks, _: dict = Depends(auth.authenticate_admin), ): return await handle_schedule( request=request, + background_tasks=background_tasks, scheduler=web_app.state.scheduler, ably_client=web_app.state.ably_client, ) diff --git a/eve/clients/discord/client.py b/eve/clients/discord/client.py index 7c2cd45..b953e35 100644 --- a/eve/clients/discord/client.py +++ b/eve/clients/discord/client.py @@ -53,7 +53,7 @@ def __init__( if local: self.api_url = "http://localhost:8000" else: - self.api_url = os.getenv(f"EDEN_API_URL") + self.api_url = os.getenv("EDEN_API_URL") self.channel_name = common.get_ably_channel_name( agent.username, ClientType.DISCORD ) @@ -169,16 +169,13 @@ async def on_message(self, message: discord.Message) -> None: # Lookup thread if thread_key not in self.known_threads: - self.known_threads[thread_key] = self.agent.request_thread( - key=thread_key - ) + self.known_threads[thread_key] = self.agent.request_thread(key=thread_key) thread = self.known_threads[thread_key] # Lookup user if message.author.id not in self.known_users: self.known_users[message.author.id] = User.from_discord( - message.author.id, - message.author.name + message.author.id, message.author.name ) user = self.known_users[message.author.id] @@ -335,6 +332,6 @@ def start( ) parser.add_argument("--local", help="Run locally", action="store_true") args = parser.parse_args() - + load_env(args.db) start(args.env, args.agent, args.local) From fb8deb196fa6bae14e1c66984c3d39b5abadd548 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Tue, 7 Jan 2025 13:08:01 -0500 Subject: [PATCH 09/10] add trigger model --- eve/api.py | 44 +++++++++++++++++++++++------- eve/api/handlers.py | 66 ++++++++++++++++++++++++++++++++------------- eve/api/requests.py | 17 ++++++++---- eve/deploy.py | 6 ----- eve/trigger.py | 20 ++++++++++++++ 5 files changed, 113 insertions(+), 40 deletions(-) create mode 100644 eve/trigger.py diff --git a/eve/api.py b/eve/api.py index 7566ad7..c271aee 100644 --- a/eve/api.py +++ b/eve/api.py @@ -15,13 +15,23 @@ handle_cancel, handle_chat, handle_create, - handle_deployment, - handle_schedule, + handle_deployment_create, + handle_deployment_delete, + handle_schedule_create, + handle_schedule_delete, handle_stream_chat, + handle_trigger_create, + handle_trigger_delete, ) from eve.api.requests import ( CancelRequest, ChatRequest, + CreateDeploymentRequest, + CreateScheduleRequest, + CreateTriggerRequest, + DeleteDeploymentRequest, + DeleteScheduleRequest, + DeleteTriggerRequest, ScheduleRequest, TaskRequest, DeployRequest, @@ -115,18 +125,32 @@ async def stream_chat( return await handle_stream_chat(request, background_tasks) -@web_app.post("/deployment") -async def deployment( - request: DeployRequest, _: dict = Depends(auth.authenticate_admin) +@web_app.post("/deployments/create") +async def deployment_create( + request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin) ): - return await handle_deployment(request) + return await handle_deployment_create(request) -@web_app.post("/schedule") -async def schedule( - request: ScheduleRequest, _: dict = Depends(auth.authenticate_admin) +@web_app.post("/deployments/delete") +async def deployment_delete( + request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin) ): - return await handle_schedule(request) + return await handle_deployment_delete(request) + + +@web_app.post("/triggers/create") +async def trigger_create( + request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_create(request, scheduler, web_app.state.ably_client) + + +@web_app.post("/trigger/delete") +async def trigger_delete( + request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_delete(request, scheduler) # Modal app setup diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 1d27cf5..1885280 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -4,6 +4,7 @@ import time import traceback import asyncio +from bson import ObjectId from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse from ably import AblyRealtime @@ -13,8 +14,10 @@ from eve.api.requests import ( CancelRequest, ChatRequest, - DeployRequest, - ScheduleRequest, + CreateDeploymentRequest, + CreateTriggerRequest, + DeleteDeploymentRequest, + DeleteTriggerRequest, TaskRequest, ) from eve.api.helpers import ( @@ -23,8 +26,8 @@ serialize_for_json, setup_chat, ) +from eve.trigger import Cron, Trigger from eve.deploy import ( - DeployCommand, create_modal_secrets, deploy_client, stop_client, @@ -153,35 +156,35 @@ async def event_generator(): return {"status": "error", "message": str(e)} -async def handle_deployment(request: DeployRequest): +async def handle_deployment_create(request: CreateDeploymentRequest): try: if request.credentials: create_modal_secrets( request.credentials, f"{request.agent_key}-secrets-{db}", ) - - if request.command == DeployCommand.DEPLOY: deploy_client(request.agent_key, request.platform.value) return { "status": "success", "message": f"Deployed {request.platform.value} client", } - elif request.command == DeployCommand.STOP: - stop_client(request.agent_key, request.platform.value) - return { - "status": "success", - "message": f"Stopped {request.platform.value} client", - } - else: - raise Exception("Invalid command") + except Exception as e: + return {"status": "error", "message": str(e)} + +async def handle_deployment_delete(request: DeleteDeploymentRequest): + try: + stop_client(request.agent_key, request.platform.value) + return { + "status": "success", + "message": f"Stopped {request.platform.value} client", + } except Exception as e: return {"status": "error", "message": str(e)} -async def handle_schedule( - request: ScheduleRequest, +async def handle_trigger_create( + request: CreateTriggerRequest, scheduler: BackgroundScheduler, ably_client: AblyRealtime, ): @@ -204,7 +207,7 @@ def run_scheduled_task(): user_id=request.user_id, agent_id=request.agent_id, user_message=UserMessage( - content=request.message_content, + content=request.message, ), update_config=request.update_config, force_reply=True, @@ -233,16 +236,29 @@ def run_scheduled_task(): finally: loop.close() + trigger_id = f"{request.user_id}_{request.agent_id}_{time.time()}" job = scheduler.add_job( run_scheduled_task, trigger=trigger, - id=f"{request.user_id}_{request.agent_id}_{time.time()}", + id=trigger_id, misfire_grace_time=None, coalesce=True, ) + trigger = Trigger( + trigger_id=trigger_id, + user=ObjectId(request.user_id), + agent=ObjectId(request.agent_id), + schedule=request.schedule.to_cron_dict(), + message=request.message, + update_config=request.update_config.model_dump() + if request.update_config + else {}, + ) + trigger.save() + return { - "status": "success", + "id": str(trigger.id), "job_id": job.id, "next_run_time": str(job.next_run_time), } @@ -250,3 +266,15 @@ def run_scheduled_task(): except Exception as e: logger.error(f"Error scheduling task: {str(e)}\n{traceback.format_exc()}") return {"status": "error", "message": str(e)} + + +async def handle_trigger_delete( + request: DeleteTriggerRequest, scheduler: BackgroundScheduler +): + try: + cron = Cron.from_mongo(request.id) + scheduler.remove_job(cron.job_id) + cron.delete() + return {"status": "success", "message": f"Deleted job {request.job_id}"} + except Exception as e: + return {"status": "error", "message": str(e)} diff --git a/eve/api/requests.py b/eve/api/requests.py index 6165db4..dbfa0cb 100644 --- a/eve/api/requests.py +++ b/eve/api/requests.py @@ -2,7 +2,6 @@ from pydantic import BaseModel, ConfigDict, Field from datetime import datetime -from eve.deploy import DeployCommand from eve.models import ClientType from eve.thread import UserMessage @@ -64,16 +63,24 @@ def to_cron_dict(self) -> dict: return {k: v for k, v in self.model_dump().items() if v is not None} -class ScheduleRequest(BaseModel): +class CreateTriggerRequest(BaseModel): agent_id: str user_id: str - message_content: str + message: str schedule: CronSchedule update_config: Optional[UpdateConfig] = None -class DeployRequest(BaseModel): +class DeleteTriggerRequest(BaseModel): + id: str + + +class CreateDeploymentRequest(BaseModel): agent_key: str platform: ClientType - command: DeployCommand credentials: Optional[Dict[str, str]] = None + + +class DeleteDeploymentRequest(BaseModel): + agent_key: str + platform: ClientType diff --git a/eve/deploy.py b/eve/deploy.py index 493622b..aec4166 100644 --- a/eve/deploy.py +++ b/eve/deploy.py @@ -2,7 +2,6 @@ from pathlib import Path import subprocess import tempfile -from enum import Enum from typing import Dict @@ -13,11 +12,6 @@ env = "prod" if db == "PROD" else "stage" -class DeployCommand(str, Enum): - DEPLOY = "deploy" - STOP = "stop" - - def authenticate_modal_key() -> bool: token_id = os.getenv("MODAL_DEPLOYER_TOKEN_ID") token_secret = os.getenv("MODAL_DEPLOYER_TOKEN_SECRET") diff --git a/eve/trigger.py b/eve/trigger.py new file mode 100644 index 0000000..1c9e450 --- /dev/null +++ b/eve/trigger.py @@ -0,0 +1,20 @@ +from typing import Dict, Any +from bson import ObjectId +from eve.mongo import Collection, Document + + +@Collection("triggers") +class Trigger(Document): + trigger_id: str + user: ObjectId + agent: ObjectId + schedule: Dict[str, Any] + message: str + update_config: Dict[str, Any] + + def __init__(self, **data): + if isinstance(data.get("user"), str): + data["user"] = ObjectId(data["user"]) + if isinstance(data.get("agent"), str): + data["agent"] = ObjectId(data["agent"]) + super().__init__(**data) From bf9fea9220e0bf505329f5f2518808a8d23a3050 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Tue, 7 Jan 2025 13:42:35 -0500 Subject: [PATCH 10/10] okay it appears to all work --- eve/agent.py | 1 - eve/api.py | 190 -------------------------------------------- eve/api/api.py | 110 +++++++++++++++---------- eve/api/handlers.py | 81 +++++-------------- eve/api/helpers.py | 40 +++++++++- eve/trigger.py | 67 ++++++++++++++++ 6 files changed, 193 insertions(+), 296 deletions(-) delete mode 100644 eve/api.py diff --git a/eve/agent.py b/eve/agent.py index f7de278..575fa7a 100644 --- a/eve/agent.py +++ b/eve/agent.py @@ -1,5 +1,4 @@ import os -import yaml import time import json import traceback diff --git a/eve/api.py b/eve/api.py deleted file mode 100644 index c271aee..0000000 --- a/eve/api.py +++ /dev/null @@ -1,190 +0,0 @@ -import logging -import os -import threading -import modal -from fastapi import FastAPI, Depends, BackgroundTasks -from fastapi.middleware.cors import CORSMiddleware -from fastapi.security import APIKeyHeader, HTTPBearer -from ably import AblyRealtime -from apscheduler.schedulers.background import BackgroundScheduler -from pathlib import Path -from contextlib import asynccontextmanager - -from eve import auth -from eve.api.handlers import ( - handle_cancel, - handle_chat, - handle_create, - handle_deployment_create, - handle_deployment_delete, - handle_schedule_create, - handle_schedule_delete, - handle_stream_chat, - handle_trigger_create, - handle_trigger_delete, -) -from eve.api.requests import ( - CancelRequest, - ChatRequest, - CreateDeploymentRequest, - CreateScheduleRequest, - CreateTriggerRequest, - DeleteDeploymentRequest, - DeleteScheduleRequest, - DeleteTriggerRequest, - ScheduleRequest, - TaskRequest, - DeployRequest, -) -from eve.deploy import ( - authenticate_modal_key, - check_environment_exists, - create_environment, -) -from eve import deploy -from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 - - -logging.basicConfig(level=logging.INFO) -logging.getLogger("ably").setLevel(logging.WARNING) - - -db = os.getenv("DB", "STAGE").upper() -if db not in ["PROD", "STAGE"]: - raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") -app_name = "api-prod" if db == "PROD" else "api-stage" - - -# FastAPI setup -@asynccontextmanager -async def lifespan(app: FastAPI): - # Startup - watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) - watch_thread.start() - app.state.watch_thread = watch_thread - - app.state.ably_client = AblyRealtime( - os.getenv("ABLY_PUBLISHER_KEY"), - options={ - "heartbeat_interval": 15000, - "connection_state_ttl": 60000, - "disconnected_retry_timeout": 15000, - }, - ) - yield - # Shutdown - if hasattr(app.state, "watch_thread"): - app.state.watch_thread.join(timeout=5) - if hasattr(app.state, "scheduler"): - app.state.scheduler.shutdown(wait=True) - if hasattr(app.state, "ably_client"): - await app.state.ably_client.close() - - -web_app = FastAPI(lifespan=lifespan) -web_app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) -scheduler = BackgroundScheduler() -scheduler.start() - -api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) -bearer_scheme = HTTPBearer(auto_error=False) -background_tasks: BackgroundTasks = BackgroundTasks() - - -@web_app.post("/create") -async def create(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): - return await handle_create(request) - - -@web_app.post("/cancel") -async def cancel(request: CancelRequest, _: dict = Depends(auth.authenticate_admin)): - return await handle_cancel(request) - - -@web_app.post("/chat") -async def chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), -): - return await handle_chat(request, background_tasks, web_app.state.ably_client) - - -@web_app.post("/chat/stream") -async def stream_chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), -): - return await handle_stream_chat(request, background_tasks) - - -@web_app.post("/deployments/create") -async def deployment_create( - request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_deployment_create(request) - - -@web_app.post("/deployments/delete") -async def deployment_delete( - request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_deployment_delete(request) - - -@web_app.post("/triggers/create") -async def trigger_create( - request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_trigger_create(request, scheduler, web_app.state.ably_client) - - -@web_app.post("/trigger/delete") -async def trigger_delete( - request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_trigger_delete(request, scheduler) - - -# Modal app setup -app = modal.App( - name=app_name, - secrets=[ - modal.Secret.from_name("eve-secrets"), - modal.Secret.from_name(f"eve-secrets-{db}"), - ], -) - -root_dir = Path(__file__).parent.parent -workflows_dir = root_dir / ".." / "workflows" - -image = ( - modal.Image.debian_slim(python_version="3.11") - .env({"DB": db, "MODAL_SERVE": os.getenv("MODAL_SERVE")}) - .apt_install("git", "libmagic1", "ffmpeg", "wget") - .pip_install_from_pyproject(str(root_dir / "pyproject.toml")) - .run_commands(["playwright install"]) - .copy_local_dir(str(workflows_dir), "/workflows") -) - - -@app.function( - image=image, - keep_warm=1, - concurrency_limit=10, - container_idle_timeout=60, - timeout=3600, -) -@modal.asgi_app() -def fastapi_app(): - authenticate_modal_key() - if not check_environment_exists(deploy.DEPLOYMENT_ENV_NAME): - create_environment(deploy.DEPLOYMENT_ENV_NAME) - return web_app diff --git a/eve/api/api.py b/eve/api/api.py index 3aa87f7..a563afa 100644 --- a/eve/api/api.py +++ b/eve/api/api.py @@ -1,28 +1,35 @@ +import logging import os +import threading import modal from fastapi import FastAPI, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader, HTTPBearer -import logging from ably import AblyRealtime from apscheduler.schedulers.background import BackgroundScheduler from pathlib import Path +from contextlib import asynccontextmanager from eve import auth from eve.api.handlers import ( handle_cancel, handle_chat, handle_create, - handle_deployment, - handle_schedule, + handle_deployment_create, + handle_deployment_delete, handle_stream_chat, + handle_trigger_create, + handle_trigger_delete, ) +from eve.api.helpers import load_existing_triggers from eve.api.requests import ( CancelRequest, ChatRequest, - ScheduleRequest, + CreateDeploymentRequest, + CreateTriggerRequest, + DeleteDeploymentRequest, + DeleteTriggerRequest, TaskRequest, - DeployRequest, ) from eve.deploy import ( authenticate_modal_key, @@ -30,19 +37,48 @@ create_environment, ) from eve import deploy +from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 + -# Config and logging setup logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) + db = os.getenv("DB", "STAGE").upper() if db not in ["PROD", "STAGE"]: raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") app_name = "api-prod" if db == "PROD" else "api-stage" +logging.getLogger("ably").setLevel(logging.INFO if db != "PROD" else logging.WARNING) + # FastAPI setup -web_app = FastAPI() +@asynccontextmanager +async def lifespan(app: FastAPI): + from eve.api.handlers import handle_chat + + # Startup + watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) + watch_thread.start() + app.state.watch_thread = watch_thread + + app.state.ably_client = AblyRealtime( + key=os.getenv("ABLY_PUBLISHER_KEY"), + ) + + # Load existing triggers + await load_existing_triggers(scheduler, app.state.ably_client, handle_chat) + + yield + # Shutdown + if hasattr(app.state, "watch_thread"): + app.state.watch_thread.join(timeout=5) + if hasattr(app.state, "scheduler"): + app.state.scheduler.shutdown(wait=True) + if hasattr(app.state, "ably_client"): + await app.state.ably_client.close() + + +web_app = FastAPI(lifespan=lifespan) web_app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -50,25 +86,8 @@ allow_methods=["*"], allow_headers=["*"], ) - - -@web_app.on_event("startup") -async def startup_event(): - # Setup - scheduler = BackgroundScheduler() - scheduler.start() - web_app.state.scheduler = scheduler - web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) - - -@web_app.on_event("shutdown") -async def shutdown_event(): - # Cleanup - if hasattr(web_app.state, "ably_client"): - web_app.state.ably_client.close() - if hasattr(web_app.state, "scheduler"): - web_app.state.scheduler.shutdown() - +scheduler = BackgroundScheduler() +scheduler.start() api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) bearer_scheme = HTTPBearer(auto_error=False) @@ -103,25 +122,32 @@ async def stream_chat( return await handle_stream_chat(request, background_tasks) -@web_app.post("/deployment") -async def deployment( - request: DeployRequest, _: dict = Depends(auth.authenticate_admin) +@web_app.post("/deployments/create") +async def deployment_create( + request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin) ): - return await handle_deployment(request) + return await handle_deployment_create(request) -@web_app.post("/schedule") -async def schedule( - request: ScheduleRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), +@web_app.post("/deployments/delete") +async def deployment_delete( + request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin) ): - return await handle_schedule( - request=request, - background_tasks=background_tasks, - scheduler=web_app.state.scheduler, - ably_client=web_app.state.ably_client, - ) + return await handle_deployment_delete(request) + + +@web_app.post("/triggers/create") +async def trigger_create( + request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_create(request, scheduler, web_app.state.ably_client) + + +@web_app.post("/triggers/delete") +async def trigger_delete( + request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_delete(request, scheduler) # Modal app setup diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 1885280..b7dc6b2 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -3,13 +3,11 @@ import os import time import traceback -import asyncio from bson import ObjectId from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse from ably import AblyRealtime from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.cron import CronTrigger from eve.api.requests import ( CancelRequest, @@ -26,7 +24,7 @@ serialize_for_json, setup_chat, ) -from eve.trigger import Cron, Trigger +from eve.trigger import Trigger from eve.deploy import ( create_modal_secrets, deploy_client, @@ -36,7 +34,6 @@ from eve.mongo import serialize_document from eve.task import Task from eve.tool import Tool -from eve.thread import UserMessage logger = logging.getLogger(__name__) db = os.getenv("DB", "STAGE").upper() @@ -189,60 +186,20 @@ async def handle_trigger_create( ably_client: AblyRealtime, ): try: - trigger = CronTrigger(**request.schedule.to_cron_dict()) - - def run_scheduled_task(): - logger.info(f"Running scheduled chat for user {request.user_id}") - - # Create new event loop for this thread - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - # Create new background tasks for this run - background_tasks = BackgroundTasks() - - # Create the chat request - chat_request = ChatRequest( - user_id=request.user_id, - agent_id=request.agent_id, - user_message=UserMessage( - content=request.message, - ), - update_config=request.update_config, - force_reply=True, - ) - - # Run the async chat handler and its background tasks - result = loop.run_until_complete( - handle_chat( - request=chat_request, - background_tasks=background_tasks, - ably_client=ably_client, - ) - ) - - # Execute any background tasks that were created - loop.run_until_complete(background_tasks()) - - logger.info( - f"Completed scheduled chat for user {request.user_id}: {result}" - ) - - except Exception as e: - logger.error( - f"Error in scheduled chat: {str(e)}\n{traceback.format_exc()}" - ) - finally: - loop.close() - - trigger_id = f"{request.user_id}_{request.agent_id}_{time.time()}" - job = scheduler.add_job( - run_scheduled_task, - trigger=trigger, - id=trigger_id, - misfire_grace_time=None, - coalesce=True, + from eve.trigger import create_chat_trigger + + trigger_id = f"{request.user_id}_{request.agent_id}_{int(time.time())}" + + job = await create_chat_trigger( + user_id=request.user_id, + agent_id=request.agent_id, + message=request.message, + schedule=request.schedule.to_cron_dict(), + update_config=request.update_config, + scheduler=scheduler, + ably_client=ably_client, + trigger_id=trigger_id, + handle_chat_fn=handle_chat, ) trigger = Trigger( @@ -272,9 +229,9 @@ async def handle_trigger_delete( request: DeleteTriggerRequest, scheduler: BackgroundScheduler ): try: - cron = Cron.from_mongo(request.id) - scheduler.remove_job(cron.job_id) - cron.delete() - return {"status": "success", "message": f"Deleted job {request.job_id}"} + trigger = Trigger.from_mongo(request.id) + scheduler.remove_job(trigger.trigger_id) + trigger.delete() + return {"status": "success", "message": f"Deleted job {trigger.trigger_id}"} except Exception as e: return {"status": "error", "message": str(e)} diff --git a/eve/api/helpers.py b/eve/api/helpers.py index 75cc1f1..5f50d15 100644 --- a/eve/api/helpers.py +++ b/eve/api/helpers.py @@ -1,4 +1,3 @@ -import asyncio import logging import os from typing import Optional @@ -6,6 +5,8 @@ from bson import ObjectId from fastapi import BackgroundTasks from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +import traceback from eve.tool import Tool from eve.user import User @@ -13,6 +14,8 @@ from eve.thread import Thread from eve.llm import async_title_thread from eve.api.requests import ChatRequest, UpdateConfig +from eve.trigger import Trigger +from eve.mongo import get_collection logger = logging.getLogger(__name__) @@ -84,3 +87,38 @@ async def emit_channel_update(update_channel: AblyRealtime, data: dict): await update_channel.publish("update", data) except Exception as e: logger.error(f"Failed to publish to Ably: {str(e)}") + + +async def load_existing_triggers( + scheduler: BackgroundScheduler, ably_client: AblyRealtime, handle_chat_fn +): + """Load all existing triggers from the database and add them to the scheduler""" + from eve.trigger import create_chat_trigger + + triggers_collection = get_collection(Trigger.collection_name) + + for trigger_doc in triggers_collection.find({}): + try: + # Convert mongo doc to Trigger object + trigger = Trigger.convert_from_mongo(trigger_doc) + trigger = Trigger.from_schema(trigger) + + await create_chat_trigger( + user_id=str(trigger.user), + agent_id=str(trigger.agent), + message=trigger.message, + schedule=trigger.schedule, + update_config=UpdateConfig(**trigger.update_config) + if trigger.update_config + else None, + scheduler=scheduler, + ably_client=ably_client, + trigger_id=trigger.trigger_id, + handle_chat_fn=handle_chat_fn, + ) + logger.info(f"Loaded trigger {trigger.trigger_id}") + + except Exception as e: + logger.error( + f"Error loading trigger {trigger_doc.get('trigger_id', 'unknown')}: {str(e)}\n{traceback.format_exc()}" + ) diff --git a/eve/trigger.py b/eve/trigger.py index 1c9e450..6fe7f9a 100644 --- a/eve/trigger.py +++ b/eve/trigger.py @@ -1,6 +1,18 @@ from typing import Dict, Any from bson import ObjectId from eve.mongo import Collection, Document +import asyncio +import logging +from typing import Optional +from fastapi import BackgroundTasks +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + +from eve.api.requests import ChatRequest, UpdateConfig +from eve.thread import UserMessage + +logger = logging.getLogger(__name__) @Collection("triggers") @@ -18,3 +30,58 @@ def __init__(self, **data): if isinstance(data.get("agent"), str): data["agent"] = ObjectId(data["agent"]) super().__init__(**data) + + +async def create_chat_trigger( + user_id: str, + agent_id: str, + message: str, + schedule: dict, + update_config: Optional[UpdateConfig], + scheduler: BackgroundScheduler, + ably_client: AblyRealtime, + trigger_id: str, + handle_chat_fn, +): + """Creates and adds a scheduled chat job to the scheduler""" + + def run_scheduled_task(): + logger.info(f"Running scheduled chat for user {user_id}") + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + background_tasks = BackgroundTasks() + chat_request = ChatRequest( + user_id=user_id, + agent_id=agent_id, + user_message=UserMessage(content=message), + update_config=update_config, + force_reply=True, + ) + + result = loop.run_until_complete( + handle_chat_fn( + request=chat_request, + background_tasks=background_tasks, + ably_client=ably_client, + ) + ) + loop.run_until_complete(background_tasks()) + logger.info(f"Completed scheduled chat for trigger {trigger_id}: {result}") + + except Exception as e: + logger.error(f"Error in scheduled chat: {str(e)}") + finally: + loop.close() + + job = scheduler.add_job( + run_scheduled_task, + trigger=CronTrigger(**schedule), + id=trigger_id, + misfire_grace_time=None, + coalesce=True, + ) + + return job