diff --git a/.github/workflows/prod.yaml b/.github/workflows/prod.yaml index c2c2a41..358f5af 100644 --- a/.github/workflows/prod.yaml +++ b/.github/workflows/prod.yaml @@ -46,4 +46,4 @@ jobs: - name: Deploy to Modal working-directory: eve - run: rye run modal deploy ./eve/api.py + run: rye run modal deploy ./eve/api/api.py diff --git a/.github/workflows/staging.yaml b/.github/workflows/staging.yaml index 71eccf9..5d18264 100644 --- a/.github/workflows/staging.yaml +++ b/.github/workflows/staging.yaml @@ -46,4 +46,4 @@ jobs: - name: Deploy to Modal working-directory: eve - run: rye run modal deploy ./eve/api.py + run: rye run modal deploy ./eve/api/api.py diff --git a/eve/__init__.py b/eve/__init__.py index f3f7feb..440a90c 100644 --- a/eve/__init__.py +++ b/eve/__init__.py @@ -16,7 +16,7 @@ def load_env(db): db = db.upper() if db not in ["STAGE", "PROD"]: raise ValueError(f"Invalid database: {db}") - + os.environ["DB"] = db # First try ~/.eve diff --git a/eve/agent.py b/eve/agent.py index f7de278..575fa7a 100644 --- a/eve/agent.py +++ b/eve/agent.py @@ -1,5 +1,4 @@ import os -import yaml import time import json import traceback diff --git a/eve/api.py b/eve/api.py deleted file mode 100644 index 60d2fc8..0000000 --- a/eve/api.py +++ /dev/null @@ -1,363 +0,0 @@ -import os -import json -import modal -import threading -from fastapi import FastAPI, Depends, BackgroundTasks -from fastapi.responses import StreamingResponse -from fastapi.middleware.cors import CORSMiddleware -from fastapi.security import APIKeyHeader, HTTPBearer -from pydantic import BaseModel, ConfigDict -from typing import Optional -from bson import ObjectId -import logging -from ably import AblyRealtime -from pathlib import Path -import aiohttp -import traceback - -from eve import auth -from eve.deploy import ( - DeployCommand, - DeployRequest, - authenticate_modal_key, - check_environment_exists, - create_environment, - create_modal_secrets, - deploy_client, - stop_client, -) -from eve.tool import Tool -from eve.llm import UpdateType, UserMessage, async_prompt_thread, async_title_thread -from eve.thread import Thread -from eve.mongo import serialize_document -from eve.agent import Agent -from eve.user import User -from eve.task import Task -from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 -from eve import deploy - -# Config and logging setup -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -db = os.getenv("DB", "STAGE").upper() -if db not in ["PROD", "STAGE"]: - raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") -app_name = "api-prod" if db == "PROD" else "api-stage" - -# FastAPI setup -web_app = FastAPI() -web_app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) -bearer_scheme = HTTPBearer(auto_error=False) -background_tasks: BackgroundTasks = BackgroundTasks() - - -# Store ably client at app state level -@web_app.on_event("startup") -async def startup_event(): - web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) - watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) - watch_thread.start() - logger.info("Started tasks2 watch thread.") - -class TaskRequest(BaseModel): - tool: str - args: dict - user_id: str - -class CancelRequest(BaseModel): - task_id: str - user_id: str - -class UpdateConfig(BaseModel): - sub_channel_name: Optional[str] = None - update_endpoint: Optional[str] = None - discord_channel_id: Optional[str] = None - telegram_chat_id: Optional[str] = None - cast_hash: Optional[str] = None - author_fid: Optional[int] = None - message_id: Optional[str] = None - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ChatRequest(BaseModel): - user_id: str - agent_id: str - user_message: UserMessage - thread_id: Optional[str] = None - update_config: Optional[UpdateConfig] = None - force_reply: bool = False - - -async def handle_task(tool: str, user_id: str, args: dict = {}) -> dict: - tool = Tool.load(key=tool) - return await tool.async_start_task( - requester_id=user_id, user_id=user_id, args=args - ) - - -def serialize_for_json(obj): - """Recursively serialize objects for JSON, handling ObjectId and other special types""" - if isinstance(obj, ObjectId): - return str(obj) - elif isinstance(obj, dict): - return {k: serialize_for_json(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [serialize_for_json(item) for item in obj] - return obj - - -async def setup_chat( - request: ChatRequest, background_tasks: BackgroundTasks -) -> tuple[User, Agent, Thread, list[Tool], Optional[AblyRealtime]]: - update_channel = None - if request.update_config and request.update_config.sub_channel_name: - try: - update_channel = web_app.state.ably_client.channels.get( - str(request.update_config.sub_channel_name) - ) - except Exception as e: - logger.error(f"Failed to create Ably channel: {str(e)}") - - user = User.from_mongo(request.user_id) - agent = Agent.from_mongo(request.agent_id, cache=True) - tools = agent.get_tools(cache=True) - - if request.thread_id: - thread = Thread.from_mongo(request.thread_id) - else: - thread = agent.request_thread(user=user.id) - background_tasks.add_task(async_title_thread, thread, request.user_message) - - return user, agent, thread, tools, update_channel - - -@web_app.post("/create") -async def task_admin( - request: TaskRequest, - _: dict = Depends(auth.authenticate_admin) -): - print("===== this is the request", request) - result = await handle_task(request.tool, request.user_id, request.args) - return serialize_document(result.model_dump(by_alias=True)) - - -async def handle_cancel(task_id: str, user_id: str): - task = Task.from_mongo(task_id) - assert str(task.user) == user_id, "Task user does not match user_id" - if task.status in ["completed", "failed", "cancelled"]: - return {"status": task.status} - tool = Tool.load(key=task.tool) - tool.cancel(task) - return {"status": task.status} - - -@web_app.post("/cancel") -async def cancel( - request: CancelRequest, - _: dict = Depends(auth.authenticate_admin) -): - result = await handle_cancel(request.task_id, request.user_id) - return result - - -@web_app.post("/chat") -async def handle_chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - auth: dict = Depends(auth.authenticate_admin), -): - try: - user, agent, thread, tools, update_channel = await setup_chat( - request, background_tasks - ) - - async def run_prompt(): - - async for update in async_prompt_thread( - user=user, - agent=agent, - thread=thread, - user_messages=request.user_message, - tools=tools, - force_reply=request.force_reply, - model="claude-3-5-sonnet-20241022", - stream=False, - ): - data = { - "type": update.type.value, - "update_config": request.update_config.model_dump() - if request.update_config - else {}, - } - - if update.type == UpdateType.ASSISTANT_MESSAGE: - data["content"] = update.message.content - elif update.type == UpdateType.TOOL_COMPLETE: - data["tool"] = update.tool_name - data["result"] = serialize_for_json(update.result) - elif update.type == UpdateType.ERROR: - data["error"] = update.error if hasattr(update, "error") else None - - if request.update_config: - if request.update_config.update_endpoint: - async with aiohttp.ClientSession() as session: - try: - async with session.post( - request.update_config.update_endpoint, - json=data, - headers={ - "Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}" - }, - ) as response: - if response.status != 200: - logger.error( - f"Failed to send update to endpoint: {await response.text()}" - ) - except Exception as e: - logger.error( - f"Error sending update to endpoint: {str(e)}" - ) - - elif update_channel: - try: - await update_channel.publish("update", data) - except Exception as e: - logger.error(f"Failed to publish to Ably: {str(e)}") - - background_tasks.add_task(run_prompt) - return {"status": "success", "thread_id": str(thread.id)} - - except Exception as e: - logger.error(f"Error in handle_chat: {str(e)}\n{traceback.format_exc()}") - return {"status": "error", "message": str(e)} - - -@web_app.post("/chat/stream") -async def stream_chat( - request: ChatRequest, - auth: dict = Depends(auth.authenticate_admin), -): - try: - user, agent, thread, tools, update_channel = await setup_chat( - request, BackgroundTasks() - ) - - async def event_generator(): - async for update in async_prompt_thread( - user=user, - agent=agent, - thread=thread, - user_messages=request.user_message, - tools=tools, - force_reply=request.force_reply, - model="claude-3-5-sonnet-20241022", - stream=True, - ): - data = {"type": update.type} - if update.type == UpdateType.ASSISTANT_TOKEN: - data["text"] = update.text - elif update.type == UpdateType.ASSISTANT_MESSAGE: - data["content"] = update.message.content - if update.message.tool_calls: - data["tool_calls"] = [ - serialize_for_json(t.model_dump()) - for t in update.message.tool_calls - ] - elif update.type == UpdateType.TOOL_COMPLETE: - data["tool"] = update.tool_name - data["result"] = serialize_for_json(update.result) - elif update.type == UpdateType.ERROR: - data["error"] = update.error or "Unknown error occurred" - - yield f"data: {json.dumps({'event': 'update', 'data': data})}\n\n" - - yield f"data: {json.dumps({'event': 'done', 'data': ''})}\n\n" - - return StreamingResponse( - event_generator(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - }, - ) - - except Exception as e: - logger.error(f"Error in stream_chat: {str(e)}\n{traceback.format_exc()}") - return {"status": "error", "message": str(e)} - - -@web_app.post("/deployment") -async def deploy_handler( - request: DeployRequest, auth: dict = Depends(auth.authenticate_admin) -): - try: - if request.credentials: - create_modal_secrets( - request.credentials, - f"{request.agent_key}-secrets", - ) - - if request.command == DeployCommand.DEPLOY: - deploy_client(request.agent_key, request.platform.value) - return { - "status": "success", - "message": f"Deployed {request.platform.value} client", - } - elif request.command == DeployCommand.STOP: - stop_client(request.agent_key, request.platform.value, db) - return { - "status": "success", - "message": f"Stopped {request.platform.value} client", - } - else: - raise Exception("Invalid command") - - except Exception as e: - return {"status": "error", "message": str(e)} - - -# Modal app setup -app = modal.App( - name=app_name, - secrets=[ - modal.Secret.from_name("eve-secrets"), - modal.Secret.from_name(f"eve-secrets-{db}"), - ], -) - -root_dir = Path(__file__).parent.parent -workflows_dir = root_dir / ".." / "workflows" - -image = ( - modal.Image.debian_slim(python_version="3.11") - .env({"DB": db, "MODAL_SERVE": os.getenv("MODAL_SERVE")}) - .apt_install("git", "libmagic1", "ffmpeg", "wget") - .pip_install_from_pyproject(str(root_dir / "pyproject.toml")) - .run_commands(["playwright install"]) - .copy_local_dir(str(workflows_dir), "/workflows") -) - - -@app.function( - image=image, - keep_warm=1, - concurrency_limit=10, - container_idle_timeout=60, - timeout=3600, -) -@modal.asgi_app() -def fastapi_app(): - authenticate_modal_key() - if not check_environment_exists(deploy.DEPLOYMENT_ENV_NAME): - create_environment(deploy.DEPLOYMENT_ENV_NAME) - return web_app diff --git a/eve/api/__init__.py b/eve/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eve/api/api.py b/eve/api/api.py new file mode 100644 index 0000000..a563afa --- /dev/null +++ b/eve/api/api.py @@ -0,0 +1,187 @@ +import logging +import os +import threading +import modal +from fastapi import FastAPI, Depends, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import APIKeyHeader, HTTPBearer +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +from pathlib import Path +from contextlib import asynccontextmanager + +from eve import auth +from eve.api.handlers import ( + handle_cancel, + handle_chat, + handle_create, + handle_deployment_create, + handle_deployment_delete, + handle_stream_chat, + handle_trigger_create, + handle_trigger_delete, +) +from eve.api.helpers import load_existing_triggers +from eve.api.requests import ( + CancelRequest, + ChatRequest, + CreateDeploymentRequest, + CreateTriggerRequest, + DeleteDeploymentRequest, + DeleteTriggerRequest, + TaskRequest, +) +from eve.deploy import ( + authenticate_modal_key, + check_environment_exists, + create_environment, +) +from eve import deploy +from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 + + +logging.basicConfig(level=logging.INFO) + + +db = os.getenv("DB", "STAGE").upper() +if db not in ["PROD", "STAGE"]: + raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") +app_name = "api-prod" if db == "PROD" else "api-stage" + +logging.getLogger("ably").setLevel(logging.INFO if db != "PROD" else logging.WARNING) + + +# FastAPI setup +@asynccontextmanager +async def lifespan(app: FastAPI): + from eve.api.handlers import handle_chat + + # Startup + watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) + watch_thread.start() + app.state.watch_thread = watch_thread + + app.state.ably_client = AblyRealtime( + key=os.getenv("ABLY_PUBLISHER_KEY"), + ) + + # Load existing triggers + await load_existing_triggers(scheduler, app.state.ably_client, handle_chat) + + yield + # Shutdown + if hasattr(app.state, "watch_thread"): + app.state.watch_thread.join(timeout=5) + if hasattr(app.state, "scheduler"): + app.state.scheduler.shutdown(wait=True) + if hasattr(app.state, "ably_client"): + await app.state.ably_client.close() + + +web_app = FastAPI(lifespan=lifespan) +web_app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) +scheduler = BackgroundScheduler() +scheduler.start() + +api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) +bearer_scheme = HTTPBearer(auto_error=False) +background_tasks: BackgroundTasks = BackgroundTasks() + + +@web_app.post("/create") +async def create(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): + return await handle_create(request) + + +@web_app.post("/cancel") +async def cancel(request: CancelRequest, _: dict = Depends(auth.authenticate_admin)): + return await handle_cancel(request) + + +@web_app.post("/chat") +async def chat( + request: ChatRequest, + background_tasks: BackgroundTasks, + _: dict = Depends(auth.authenticate_admin), +): + return await handle_chat(request, background_tasks, web_app.state.ably_client) + + +@web_app.post("/chat/stream") +async def stream_chat( + request: ChatRequest, + background_tasks: BackgroundTasks, + _: dict = Depends(auth.authenticate_admin), +): + return await handle_stream_chat(request, background_tasks) + + +@web_app.post("/deployments/create") +async def deployment_create( + request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_deployment_create(request) + + +@web_app.post("/deployments/delete") +async def deployment_delete( + request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_deployment_delete(request) + + +@web_app.post("/triggers/create") +async def trigger_create( + request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_create(request, scheduler, web_app.state.ably_client) + + +@web_app.post("/triggers/delete") +async def trigger_delete( + request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_delete(request, scheduler) + + +# Modal app setup +app = modal.App( + name=app_name, + secrets=[ + modal.Secret.from_name("eve-secrets"), + modal.Secret.from_name(f"eve-secrets-{db}"), + ], +) + +root_dir = Path(__file__).parent.parent +workflows_dir = root_dir / ".." / "workflows" + +image = ( + modal.Image.debian_slim(python_version="3.11") + .env({"DB": db, "MODAL_SERVE": os.getenv("MODAL_SERVE")}) + .apt_install("git", "libmagic1", "ffmpeg", "wget") + .pip_install_from_pyproject(str(root_dir / "pyproject.toml")) + .run_commands(["playwright install"]) + .copy_local_dir(str(workflows_dir), "/workflows") +) + + +@app.function( + image=image, + keep_warm=1, + concurrency_limit=10, + container_idle_timeout=60, + timeout=3600, +) +@modal.asgi_app() +def fastapi_app(): + authenticate_modal_key() + if not check_environment_exists(deploy.DEPLOYMENT_ENV_NAME): + create_environment(deploy.DEPLOYMENT_ENV_NAME) + return web_app diff --git a/eve/api/handlers.py b/eve/api/handlers.py new file mode 100644 index 0000000..b7dc6b2 --- /dev/null +++ b/eve/api/handlers.py @@ -0,0 +1,237 @@ +import json +import logging +import os +import time +import traceback +from bson import ObjectId +from fastapi import BackgroundTasks +from fastapi.responses import StreamingResponse +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler + +from eve.api.requests import ( + CancelRequest, + ChatRequest, + CreateDeploymentRequest, + CreateTriggerRequest, + DeleteDeploymentRequest, + DeleteTriggerRequest, + TaskRequest, +) +from eve.api.helpers import ( + emit_update, + get_update_channel, + serialize_for_json, + setup_chat, +) +from eve.trigger import Trigger +from eve.deploy import ( + create_modal_secrets, + deploy_client, + stop_client, +) +from eve.llm import UpdateType, async_prompt_thread +from eve.mongo import serialize_document +from eve.task import Task +from eve.tool import Tool + +logger = logging.getLogger(__name__) +db = os.getenv("DB", "STAGE").upper() + + +async def handle_create(request: TaskRequest): + tool = Tool.load(key=request.tool) + result = await tool.async_start_task( + requester_id=request.user_id, user_id=request.user_id, args=request.args + ) + return serialize_document(result.model_dump(by_alias=True)) + + +async def handle_cancel(request: CancelRequest): + task = Task.from_mongo(request.task_id) + assert str(task.user) == request.user_id, "Task user does not match user_id" + if task.status in ["completed", "failed", "cancelled"]: + return {"status": task.status} + tool = Tool.load(key=task.tool) + tool.cancel(task) + return {"status": task.status} + + +async def handle_chat( + request: ChatRequest, background_tasks: BackgroundTasks, ably_client: AblyRealtime +): + try: + user, agent, thread, tools = await setup_chat(request, background_tasks) + update_channel = ( + await get_update_channel(request.update_config, ably_client) + if request.update_config and request.update_config.sub_channel_name + else None + ) + + async def run_prompt(): + async for update in async_prompt_thread( + user=user, + agent=agent, + thread=thread, + user_messages=request.user_message, + tools=tools, + force_reply=request.force_reply, + model="claude-3-5-sonnet-20241022", + stream=False, + ): + data = { + "type": update.type.value, + "update_config": request.update_config.model_dump() + if request.update_config + else {}, + } + + if update.type == UpdateType.ASSISTANT_MESSAGE: + data["content"] = update.message.content + elif update.type == UpdateType.TOOL_COMPLETE: + data["tool"] = update.tool_name + data["result"] = serialize_for_json(update.result) + elif update.type == UpdateType.ERROR: + data["error"] = update.error if hasattr(update, "error") else None + + await emit_update(request.update_config, update_channel, data) + + background_tasks.add_task(run_prompt) + return {"status": "success", "thread_id": str(thread.id)} + + except Exception as e: + logger.error(f"Error in handle_chat: {str(e)}\n{traceback.format_exc()}") + return {"status": "error", "message": str(e)} + + +async def handle_stream_chat(request: ChatRequest, background_tasks: BackgroundTasks): + try: + user, agent, thread, tools = await setup_chat(request, background_tasks) + + async def event_generator(): + async for update in async_prompt_thread( + user=user, + agent=agent, + thread=thread, + user_messages=request.user_message, + tools=tools, + force_reply=request.force_reply, + model="claude-3-5-sonnet-20241022", + stream=True, + ): + data = {"type": update.type} + if update.type == UpdateType.ASSISTANT_TOKEN: + data["text"] = update.text + elif update.type == UpdateType.ASSISTANT_MESSAGE: + data["content"] = update.message.content + if update.message.tool_calls: + data["tool_calls"] = [ + serialize_for_json(t.model_dump()) + for t in update.message.tool_calls + ] + elif update.type == UpdateType.TOOL_COMPLETE: + data["tool"] = update.tool_name + data["result"] = serialize_for_json(update.result) + elif update.type == UpdateType.ERROR: + data["error"] = update.error or "Unknown error occurred" + + yield f"data: {json.dumps({'event': 'update', 'data': data})}\n\n" + + yield f"data: {json.dumps({'event': 'done', 'data': ''})}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + except Exception as e: + logger.error(f"Error in stream_chat: {str(e)}\n{traceback.format_exc()}") + return {"status": "error", "message": str(e)} + + +async def handle_deployment_create(request: CreateDeploymentRequest): + try: + if request.credentials: + create_modal_secrets( + request.credentials, + f"{request.agent_key}-secrets-{db}", + ) + deploy_client(request.agent_key, request.platform.value) + return { + "status": "success", + "message": f"Deployed {request.platform.value} client", + } + except Exception as e: + return {"status": "error", "message": str(e)} + + +async def handle_deployment_delete(request: DeleteDeploymentRequest): + try: + stop_client(request.agent_key, request.platform.value) + return { + "status": "success", + "message": f"Stopped {request.platform.value} client", + } + except Exception as e: + return {"status": "error", "message": str(e)} + + +async def handle_trigger_create( + request: CreateTriggerRequest, + scheduler: BackgroundScheduler, + ably_client: AblyRealtime, +): + try: + from eve.trigger import create_chat_trigger + + trigger_id = f"{request.user_id}_{request.agent_id}_{int(time.time())}" + + job = await create_chat_trigger( + user_id=request.user_id, + agent_id=request.agent_id, + message=request.message, + schedule=request.schedule.to_cron_dict(), + update_config=request.update_config, + scheduler=scheduler, + ably_client=ably_client, + trigger_id=trigger_id, + handle_chat_fn=handle_chat, + ) + + trigger = Trigger( + trigger_id=trigger_id, + user=ObjectId(request.user_id), + agent=ObjectId(request.agent_id), + schedule=request.schedule.to_cron_dict(), + message=request.message, + update_config=request.update_config.model_dump() + if request.update_config + else {}, + ) + trigger.save() + + return { + "id": str(trigger.id), + "job_id": job.id, + "next_run_time": str(job.next_run_time), + } + + except Exception as e: + logger.error(f"Error scheduling task: {str(e)}\n{traceback.format_exc()}") + return {"status": "error", "message": str(e)} + + +async def handle_trigger_delete( + request: DeleteTriggerRequest, scheduler: BackgroundScheduler +): + try: + trigger = Trigger.from_mongo(request.id) + scheduler.remove_job(trigger.trigger_id) + trigger.delete() + return {"status": "success", "message": f"Deleted job {trigger.trigger_id}"} + except Exception as e: + return {"status": "error", "message": str(e)} diff --git a/eve/api/helpers.py b/eve/api/helpers.py new file mode 100644 index 0000000..5f50d15 --- /dev/null +++ b/eve/api/helpers.py @@ -0,0 +1,124 @@ +import logging +import os +from typing import Optional +import aiohttp +from bson import ObjectId +from fastapi import BackgroundTasks +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +import traceback + +from eve.tool import Tool +from eve.user import User +from eve.agent import Agent +from eve.thread import Thread +from eve.llm import async_title_thread +from eve.api.requests import ChatRequest, UpdateConfig +from eve.trigger import Trigger +from eve.mongo import get_collection + +logger = logging.getLogger(__name__) + + +async def get_update_channel( + update_config: UpdateConfig, ably_client: AblyRealtime +) -> Optional[AblyRealtime]: + return ably_client.channels.get(str(update_config.sub_channel_name)) + + +async def setup_chat( + request: ChatRequest, background_tasks: BackgroundTasks +) -> tuple[User, Agent, Thread, list[Tool]]: + user = User.from_mongo(request.user_id) + agent = Agent.from_mongo(request.agent_id, cache=True) + tools = agent.get_tools(cache=True) + + if request.thread_id: + thread = Thread.from_mongo(request.thread_id) + else: + thread = agent.request_thread(user=user.id) + background_tasks.add_task(async_title_thread, thread, request.user_message) + + return user, agent, thread, tools + + +def serialize_for_json(obj): + """Recursively serialize objects for JSON, handling ObjectId and other special types""" + if isinstance(obj, ObjectId): + return str(obj) + elif isinstance(obj, dict): + return {k: serialize_for_json(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [serialize_for_json(item) for item in obj] + return obj + + +async def emit_update( + update_config: UpdateConfig, update_channel: AblyRealtime, data: dict +): + if update_config and update_config.update_endpoint: + raise ValueError("update_endpoint and sub_channel_name cannot be used together") + elif update_config.update_endpoint: + await emit_http_update(update_config, data) + elif update_config.sub_channel_name: + await emit_channel_update(update_channel, data) + else: + raise ValueError("One of update_endpoint or sub_channel_name must be provided") + + +async def emit_http_update(update_config: UpdateConfig, data: dict): + async with aiohttp.ClientSession() as session: + try: + async with session.post( + update_config.update_endpoint, + json=data, + headers={"Authorization": f"Bearer {os.getenv('EDEN_ADMIN_KEY')}"}, + ) as response: + if response.status != 200: + logger.error( + f"Failed to send update to endpoint: {await response.text()}" + ) + except Exception as e: + logger.error(f"Error sending update to endpoint: {str(e)}") + + +async def emit_channel_update(update_channel: AblyRealtime, data: dict): + try: + await update_channel.publish("update", data) + except Exception as e: + logger.error(f"Failed to publish to Ably: {str(e)}") + + +async def load_existing_triggers( + scheduler: BackgroundScheduler, ably_client: AblyRealtime, handle_chat_fn +): + """Load all existing triggers from the database and add them to the scheduler""" + from eve.trigger import create_chat_trigger + + triggers_collection = get_collection(Trigger.collection_name) + + for trigger_doc in triggers_collection.find({}): + try: + # Convert mongo doc to Trigger object + trigger = Trigger.convert_from_mongo(trigger_doc) + trigger = Trigger.from_schema(trigger) + + await create_chat_trigger( + user_id=str(trigger.user), + agent_id=str(trigger.agent), + message=trigger.message, + schedule=trigger.schedule, + update_config=UpdateConfig(**trigger.update_config) + if trigger.update_config + else None, + scheduler=scheduler, + ably_client=ably_client, + trigger_id=trigger.trigger_id, + handle_chat_fn=handle_chat_fn, + ) + logger.info(f"Loaded trigger {trigger.trigger_id}") + + except Exception as e: + logger.error( + f"Error loading trigger {trigger_doc.get('trigger_id', 'unknown')}: {str(e)}\n{traceback.format_exc()}" + ) diff --git a/eve/api/requests.py b/eve/api/requests.py new file mode 100644 index 0000000..dbfa0cb --- /dev/null +++ b/eve/api/requests.py @@ -0,0 +1,86 @@ +from typing import Dict, Optional +from pydantic import BaseModel, ConfigDict, Field +from datetime import datetime + +from eve.models import ClientType +from eve.thread import UserMessage + + +class TaskRequest(BaseModel): + tool: str + args: dict + user_id: str + + +class CancelRequest(BaseModel): + task_id: str + user_id: str + + +class UpdateConfig(BaseModel): + sub_channel_name: Optional[str] = None + update_endpoint: Optional[str] = None + discord_channel_id: Optional[str] = None + telegram_chat_id: Optional[str] = None + cast_hash: Optional[str] = None + author_fid: Optional[int] = None + message_id: Optional[str] = None + model_config = ConfigDict(arbitrary_types_allowed=True) + + +class ChatRequest(BaseModel): + user_id: str + agent_id: str + user_message: UserMessage + thread_id: Optional[str] = None + update_config: Optional[UpdateConfig] = None + force_reply: bool = False + + +class CronSchedule(BaseModel): + year: Optional[int | str] = Field(None, description="4-digit year") + month: Optional[int | str] = Field(None, description="month (1-12)") + day: Optional[int | str] = Field(None, description="day of month (1-31)") + week: Optional[int | str] = Field(None, description="ISO week (1-53)") + day_of_week: Optional[int | str] = Field( + None, + description="number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)", + ) + hour: Optional[int | str] = Field(None, description="hour (0-23)") + minute: Optional[int | str] = Field(None, description="minute (0-59)") + second: Optional[int | str] = Field(None, description="second (0-59)") + start_date: Optional[datetime] = Field( + None, description="earliest possible date/time to trigger on (inclusive)" + ) + end_date: Optional[datetime] = Field( + None, description="latest possible date/time to trigger on (inclusive)" + ) + timezone: Optional[str] = Field( + None, description="time zone to use for the date/time calculations" + ) + + def to_cron_dict(self) -> dict: + return {k: v for k, v in self.model_dump().items() if v is not None} + + +class CreateTriggerRequest(BaseModel): + agent_id: str + user_id: str + message: str + schedule: CronSchedule + update_config: Optional[UpdateConfig] = None + + +class DeleteTriggerRequest(BaseModel): + id: str + + +class CreateDeploymentRequest(BaseModel): + agent_key: str + platform: ClientType + credentials: Optional[Dict[str, str]] = None + + +class DeleteDeploymentRequest(BaseModel): + agent_key: str + platform: ClientType diff --git a/eve/cli/deploy_cli.py b/eve/cli/deploy_cli.py index bbf5ec7..78d94cb 100644 --- a/eve/cli/deploy_cli.py +++ b/eve/cli/deploy_cli.py @@ -1,4 +1,3 @@ -import os import sys import yaml import click @@ -6,13 +5,13 @@ import subprocess from pathlib import Path from dotenv import dotenv_values -import tempfile import shutil +from eve.deploy import DEPLOYMENT_ENV_NAME, prepare_client_file + from .. import load_env root_dir = Path(__file__).parent.parent.parent -ENV_NAME = "deployments" def ensure_modal_env_exists(): @@ -25,45 +24,23 @@ def ensure_modal_env_exists(): ) # Check if our environment exists - if ENV_NAME not in result.stdout: - click.echo(click.style(f"Creating Modal environment: {ENV_NAME}", fg="green")) - subprocess.run(["rye", "run", "modal", "environment", "create", ENV_NAME]) + if DEPLOYMENT_ENV_NAME not in result.stdout: + click.echo( + click.style( + f"Creating Modal environment: {DEPLOYMENT_ENV_NAME}", fg="green" + ) + ) + subprocess.run( + ["rye", "run", "modal", "environment", "create", DEPLOYMENT_ENV_NAME] + ) else: click.echo( - click.style(f"Using existing Modal environment: {ENV_NAME}", fg="blue") + click.style( + f"Using existing Modal environment: {DEPLOYMENT_ENV_NAME}", fg="blue" + ) ) -def prepare_client_file(file_path: str, agent_key: str, env: str) -> str: - """Create a temporary copy of the client file with modifications""" - with open(file_path, "r") as f: - content = f.read() - - # Get the repo root directory - repo_root = root_dir.absolute() - pyproject_path = repo_root / "pyproject.toml" - - # Replace the static secret name with the dynamic one - modified_content = content.replace( - 'modal.Secret.from_name("client-secrets")', - f'modal.Secret.from_name("{agent_key}-secrets-{env}")', - ) - - # Fix pyproject.toml path to use absolute path - modified_content = modified_content.replace( - '.pip_install_from_pyproject("pyproject.toml")', - f'.pip_install_from_pyproject("{pyproject_path}")', - ) - - # Create a temporary file with the modified content - temp_dir = tempfile.mkdtemp() - temp_file = Path(temp_dir) / "modal_client.py" - with open(temp_file, "w") as f: - f.write(modified_content) - - return str(temp_file) - - def create_secrets(agent_key: str, secrets_dict: dict, env: str): if not secrets_dict: click.echo(click.style(f"No secrets found for {agent_key}", fg="yellow")) @@ -81,7 +58,7 @@ def create_secrets(agent_key: str, secrets_dict: dict, env: str): if value is not None: value = str(value).strip().strip("'\"") cmd_parts.append(f"{key}={value}") - cmd_parts.extend(["-e", ENV_NAME, "--force"]) + cmd_parts.extend(["-e", DEPLOYMENT_ENV_NAME, "--force"]) subprocess.run(cmd_parts) @@ -105,7 +82,7 @@ def deploy_client(agent_key: str, client_name: str, env: str): app_name, temp_file, "-e", - ENV_NAME, + DEPLOYMENT_ENV_NAME, ] ) finally: @@ -211,10 +188,8 @@ def deploy(agent: str, all: bool, db: str): for agent_name in agents: click.echo(click.style(f"\nProcessing agent: {agent_name}", fg="blue")) - agent_path = ( - root_dir / "eve" / "agents" / env / agent_name / "api.yaml" - ) - process_agent(agent_path, env) + agent_path = root_dir / "eve" / "agents" / env / agent_name / "api.yaml" + process_agent(agent_path) else: if not agent: diff --git a/eve/cli/start_cli.py b/eve/cli/start_cli.py index cc54811..5371a15 100644 --- a/eve/cli/start_cli.py +++ b/eve/cli/start_cli.py @@ -126,7 +126,7 @@ def start(agent: str, db: str, platforms: tuple, local: bool): @click.command() @click.option( "--host", - default="0.0.0.0", + default="127.0.0.1", help="Host to bind the server to", ) @click.option( @@ -138,7 +138,7 @@ def start(agent: str, db: str, platforms: tuple, local: bool): @click.option( "--reload", is_flag=True, - default=False, + default=True, help="Enable auto-reload on code changes", ) @click.option( @@ -150,15 +150,15 @@ def start(agent: str, db: str, platforms: tuple, local: bool): def api(host: str, port: int, reload: bool, db: str): """Start the Eve API server""" import uvicorn - import os load_env(db) - - click.echo(click.style(f"Starting API server on {host}:{port} with DB={db}...", fg="blue")) - # Adjusted the import path to look one directory up + click.echo( + click.style(f"Starting API server on {host}:{port} with DB={db}...", fg="blue") + ) + uvicorn.run( - "eve.api:web_app", + "eve.api.api:web_app", host=host, port=port, reload=reload, diff --git a/eve/clients/discord/client.py b/eve/clients/discord/client.py index 7c2cd45..b953e35 100644 --- a/eve/clients/discord/client.py +++ b/eve/clients/discord/client.py @@ -53,7 +53,7 @@ def __init__( if local: self.api_url = "http://localhost:8000" else: - self.api_url = os.getenv(f"EDEN_API_URL") + self.api_url = os.getenv("EDEN_API_URL") self.channel_name = common.get_ably_channel_name( agent.username, ClientType.DISCORD ) @@ -169,16 +169,13 @@ async def on_message(self, message: discord.Message) -> None: # Lookup thread if thread_key not in self.known_threads: - self.known_threads[thread_key] = self.agent.request_thread( - key=thread_key - ) + self.known_threads[thread_key] = self.agent.request_thread(key=thread_key) thread = self.known_threads[thread_key] # Lookup user if message.author.id not in self.known_users: self.known_users[message.author.id] = User.from_discord( - message.author.id, - message.author.name + message.author.id, message.author.name ) user = self.known_users[message.author.id] @@ -335,6 +332,6 @@ def start( ) parser.add_argument("--local", help="Run locally", action="store_true") args = parser.parse_args() - + load_env(args.db) start(args.env, args.agent, args.local) diff --git a/eve/deploy.py b/eve/deploy.py index 1fef10a..aec4166 100644 --- a/eve/deploy.py +++ b/eve/deploy.py @@ -1,27 +1,15 @@ import os +from pathlib import Path import subprocess import tempfile -from enum import Enum -from pydantic import BaseModel -from typing import Optional, Dict +from typing import Dict -from eve.models import ClientType REPO_URL = "https://github.com/edenartlab/eve.git" REPO_BRANCH = "main" DEPLOYMENT_ENV_NAME = "deployments" - - -class DeployCommand(str, Enum): - DEPLOY = "deploy" - STOP = "stop" - - -class DeployRequest(BaseModel): - agent_key: str - platform: ClientType - command: DeployCommand - credentials: Optional[Dict[str, str]] = None +db = os.getenv("DB", "STAGE").upper() +env = "prod" if db == "PROD" else "stage" def authenticate_modal_key() -> bool: @@ -77,21 +65,19 @@ def clone_repo(temp_dir: str): ) -def modify_client_file(file_path: str, agent_key: str) -> None: +def prepare_client_file(file_path: str, agent_key: str) -> None: """Modify the client file to use correct secret name and fix pyproject path""" with open(file_path, "r") as f: content = f.read() # Get the repo root directory (three levels up from the client file) - repo_root = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(file_path))) - ) - pyproject_path = os.path.join(repo_root, "pyproject.toml") + repo_root = Path(__file__).parent.parent.parent + pyproject_path = repo_root / "pyproject.toml" # Replace the static secret name with the dynamic one modified_content = content.replace( 'modal.Secret.from_name("client-secrets")', - f'modal.Secret.from_name("{agent_key}-secrets")', + f'modal.Secret.from_name("{agent_key}-secrets-{db}")', ) # Fix pyproject.toml path to use absolute path @@ -100,9 +86,13 @@ def modify_client_file(file_path: str, agent_key: str) -> None: f'.pip_install_from_pyproject("{pyproject_path}")', ) - with open(file_path, "w") as f: + temp_dir = tempfile.mkdtemp() + temp_file = Path(temp_dir) / "modal_client.py" + with open(temp_file, "w") as f: f.write(modified_content) + return str(temp_file) + def deploy_client(agent_key: str, client_name: str): with tempfile.TemporaryDirectory() as temp_dir: @@ -115,7 +105,7 @@ def deploy_client(agent_key: str, client_name: str): ) if os.path.exists(client_path): # Modify the client file to use the correct secret name - modify_client_file(client_path, agent_key) + prepare_client_file(client_path, agent_key) subprocess.run( ["modal", "deploy", client_path, "-e", DEPLOYMENT_ENV_NAME], check=True ) @@ -123,7 +113,7 @@ def deploy_client(agent_key: str, client_name: str): raise Exception(f"Client modal file not found: {client_path}") -def stop_client(agent_key: str, client_name: str, db: str): +def stop_client(agent_key: str, client_name: str): subprocess.run( [ "modal", diff --git a/eve/trigger.py b/eve/trigger.py new file mode 100644 index 0000000..6fe7f9a --- /dev/null +++ b/eve/trigger.py @@ -0,0 +1,87 @@ +from typing import Dict, Any +from bson import ObjectId +from eve.mongo import Collection, Document +import asyncio +import logging +from typing import Optional +from fastapi import BackgroundTasks +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + +from eve.api.requests import ChatRequest, UpdateConfig +from eve.thread import UserMessage + +logger = logging.getLogger(__name__) + + +@Collection("triggers") +class Trigger(Document): + trigger_id: str + user: ObjectId + agent: ObjectId + schedule: Dict[str, Any] + message: str + update_config: Dict[str, Any] + + def __init__(self, **data): + if isinstance(data.get("user"), str): + data["user"] = ObjectId(data["user"]) + if isinstance(data.get("agent"), str): + data["agent"] = ObjectId(data["agent"]) + super().__init__(**data) + + +async def create_chat_trigger( + user_id: str, + agent_id: str, + message: str, + schedule: dict, + update_config: Optional[UpdateConfig], + scheduler: BackgroundScheduler, + ably_client: AblyRealtime, + trigger_id: str, + handle_chat_fn, +): + """Creates and adds a scheduled chat job to the scheduler""" + + def run_scheduled_task(): + logger.info(f"Running scheduled chat for user {user_id}") + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + background_tasks = BackgroundTasks() + chat_request = ChatRequest( + user_id=user_id, + agent_id=agent_id, + user_message=UserMessage(content=message), + update_config=update_config, + force_reply=True, + ) + + result = loop.run_until_complete( + handle_chat_fn( + request=chat_request, + background_tasks=background_tasks, + ably_client=ably_client, + ) + ) + loop.run_until_complete(background_tasks()) + logger.info(f"Completed scheduled chat for trigger {trigger_id}: {result}") + + except Exception as e: + logger.error(f"Error in scheduled chat: {str(e)}") + finally: + loop.close() + + job = scheduler.add_job( + run_scheduled_task, + trigger=CronTrigger(**schedule), + id=trigger_id, + misfire_grace_time=None, + coalesce=True, + ) + + return job diff --git a/pyproject.toml b/pyproject.toml index affcebb..e7a0c53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "colorama>=0.4.6", "web3<7.6.1", "playwright<1.49", + "apscheduler>=3.11.0", ] [build-system] diff --git a/requirements-dev.lock b/requirements-dev.lock index 1301343..453fc2f 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -34,6 +34,8 @@ anyio==4.7.0 # via runwayml # via starlette # via watchfiles +apscheduler==3.11.0 + # via eve attrs==24.2.0 # via aiohttp # via jsonschema @@ -480,6 +482,8 @@ typing-extensions==4.12.2 # via web3 typing-inspect==0.9.0 # via clerk-backend-api +tzlocal==5.2 + # via apscheduler urllib3==2.2.3 # via botocore # via requests diff --git a/requirements.lock b/requirements.lock index 8165085..d08cb28 100644 --- a/requirements.lock +++ b/requirements.lock @@ -34,6 +34,8 @@ anyio==4.7.0 # via runwayml # via starlette # via watchfiles +apscheduler==3.11.0 + # via eve attrs==24.2.0 # via aiohttp # via jsonschema @@ -451,6 +453,8 @@ typing-extensions==4.12.2 # via web3 typing-inspect==0.9.0 # via clerk-backend-api +tzlocal==5.2 + # via apscheduler urllib3==2.2.3 # via botocore # via requests