From bf9fea9220e0bf505329f5f2518808a8d23a3050 Mon Sep 17 00:00:00 2001 From: Jonathan Miller Date: Tue, 7 Jan 2025 13:42:35 -0500 Subject: [PATCH] okay it appears to all work --- eve/agent.py | 1 - eve/api.py | 190 -------------------------------------------- eve/api/api.py | 110 +++++++++++++++---------- eve/api/handlers.py | 81 +++++-------------- eve/api/helpers.py | 40 +++++++++- eve/trigger.py | 67 ++++++++++++++++ 6 files changed, 193 insertions(+), 296 deletions(-) delete mode 100644 eve/api.py diff --git a/eve/agent.py b/eve/agent.py index f7de278..575fa7a 100644 --- a/eve/agent.py +++ b/eve/agent.py @@ -1,5 +1,4 @@ import os -import yaml import time import json import traceback diff --git a/eve/api.py b/eve/api.py deleted file mode 100644 index c271aee..0000000 --- a/eve/api.py +++ /dev/null @@ -1,190 +0,0 @@ -import logging -import os -import threading -import modal -from fastapi import FastAPI, Depends, BackgroundTasks -from fastapi.middleware.cors import CORSMiddleware -from fastapi.security import APIKeyHeader, HTTPBearer -from ably import AblyRealtime -from apscheduler.schedulers.background import BackgroundScheduler -from pathlib import Path -from contextlib import asynccontextmanager - -from eve import auth -from eve.api.handlers import ( - handle_cancel, - handle_chat, - handle_create, - handle_deployment_create, - handle_deployment_delete, - handle_schedule_create, - handle_schedule_delete, - handle_stream_chat, - handle_trigger_create, - handle_trigger_delete, -) -from eve.api.requests import ( - CancelRequest, - ChatRequest, - CreateDeploymentRequest, - CreateScheduleRequest, - CreateTriggerRequest, - DeleteDeploymentRequest, - DeleteScheduleRequest, - DeleteTriggerRequest, - ScheduleRequest, - TaskRequest, - DeployRequest, -) -from eve.deploy import ( - authenticate_modal_key, - check_environment_exists, - create_environment, -) -from eve import deploy -from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 - - -logging.basicConfig(level=logging.INFO) -logging.getLogger("ably").setLevel(logging.WARNING) - - -db = os.getenv("DB", "STAGE").upper() -if db not in ["PROD", "STAGE"]: - raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") -app_name = "api-prod" if db == "PROD" else "api-stage" - - -# FastAPI setup -@asynccontextmanager -async def lifespan(app: FastAPI): - # Startup - watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) - watch_thread.start() - app.state.watch_thread = watch_thread - - app.state.ably_client = AblyRealtime( - os.getenv("ABLY_PUBLISHER_KEY"), - options={ - "heartbeat_interval": 15000, - "connection_state_ttl": 60000, - "disconnected_retry_timeout": 15000, - }, - ) - yield - # Shutdown - if hasattr(app.state, "watch_thread"): - app.state.watch_thread.join(timeout=5) - if hasattr(app.state, "scheduler"): - app.state.scheduler.shutdown(wait=True) - if hasattr(app.state, "ably_client"): - await app.state.ably_client.close() - - -web_app = FastAPI(lifespan=lifespan) -web_app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) -scheduler = BackgroundScheduler() -scheduler.start() - -api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) -bearer_scheme = HTTPBearer(auto_error=False) -background_tasks: BackgroundTasks = BackgroundTasks() - - -@web_app.post("/create") -async def create(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)): - return await handle_create(request) - - -@web_app.post("/cancel") -async def cancel(request: CancelRequest, _: dict = Depends(auth.authenticate_admin)): - return await handle_cancel(request) - - -@web_app.post("/chat") -async def chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), -): - return await handle_chat(request, background_tasks, web_app.state.ably_client) - - -@web_app.post("/chat/stream") -async def stream_chat( - request: ChatRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), -): - return await handle_stream_chat(request, background_tasks) - - -@web_app.post("/deployments/create") -async def deployment_create( - request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_deployment_create(request) - - -@web_app.post("/deployments/delete") -async def deployment_delete( - request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_deployment_delete(request) - - -@web_app.post("/triggers/create") -async def trigger_create( - request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_trigger_create(request, scheduler, web_app.state.ably_client) - - -@web_app.post("/trigger/delete") -async def trigger_delete( - request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin) -): - return await handle_trigger_delete(request, scheduler) - - -# Modal app setup -app = modal.App( - name=app_name, - secrets=[ - modal.Secret.from_name("eve-secrets"), - modal.Secret.from_name(f"eve-secrets-{db}"), - ], -) - -root_dir = Path(__file__).parent.parent -workflows_dir = root_dir / ".." / "workflows" - -image = ( - modal.Image.debian_slim(python_version="3.11") - .env({"DB": db, "MODAL_SERVE": os.getenv("MODAL_SERVE")}) - .apt_install("git", "libmagic1", "ffmpeg", "wget") - .pip_install_from_pyproject(str(root_dir / "pyproject.toml")) - .run_commands(["playwright install"]) - .copy_local_dir(str(workflows_dir), "/workflows") -) - - -@app.function( - image=image, - keep_warm=1, - concurrency_limit=10, - container_idle_timeout=60, - timeout=3600, -) -@modal.asgi_app() -def fastapi_app(): - authenticate_modal_key() - if not check_environment_exists(deploy.DEPLOYMENT_ENV_NAME): - create_environment(deploy.DEPLOYMENT_ENV_NAME) - return web_app diff --git a/eve/api/api.py b/eve/api/api.py index 3aa87f7..a563afa 100644 --- a/eve/api/api.py +++ b/eve/api/api.py @@ -1,28 +1,35 @@ +import logging import os +import threading import modal from fastapi import FastAPI, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader, HTTPBearer -import logging from ably import AblyRealtime from apscheduler.schedulers.background import BackgroundScheduler from pathlib import Path +from contextlib import asynccontextmanager from eve import auth from eve.api.handlers import ( handle_cancel, handle_chat, handle_create, - handle_deployment, - handle_schedule, + handle_deployment_create, + handle_deployment_delete, handle_stream_chat, + handle_trigger_create, + handle_trigger_delete, ) +from eve.api.helpers import load_existing_triggers from eve.api.requests import ( CancelRequest, ChatRequest, - ScheduleRequest, + CreateDeploymentRequest, + CreateTriggerRequest, + DeleteDeploymentRequest, + DeleteTriggerRequest, TaskRequest, - DeployRequest, ) from eve.deploy import ( authenticate_modal_key, @@ -30,19 +37,48 @@ create_environment, ) from eve import deploy +from eve.tools.comfyui_tool import convert_tasks2_to_tasks3 + -# Config and logging setup logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) + db = os.getenv("DB", "STAGE").upper() if db not in ["PROD", "STAGE"]: raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE") app_name = "api-prod" if db == "PROD" else "api-stage" +logging.getLogger("ably").setLevel(logging.INFO if db != "PROD" else logging.WARNING) + # FastAPI setup -web_app = FastAPI() +@asynccontextmanager +async def lifespan(app: FastAPI): + from eve.api.handlers import handle_chat + + # Startup + watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True) + watch_thread.start() + app.state.watch_thread = watch_thread + + app.state.ably_client = AblyRealtime( + key=os.getenv("ABLY_PUBLISHER_KEY"), + ) + + # Load existing triggers + await load_existing_triggers(scheduler, app.state.ably_client, handle_chat) + + yield + # Shutdown + if hasattr(app.state, "watch_thread"): + app.state.watch_thread.join(timeout=5) + if hasattr(app.state, "scheduler"): + app.state.scheduler.shutdown(wait=True) + if hasattr(app.state, "ably_client"): + await app.state.ably_client.close() + + +web_app = FastAPI(lifespan=lifespan) web_app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -50,25 +86,8 @@ allow_methods=["*"], allow_headers=["*"], ) - - -@web_app.on_event("startup") -async def startup_event(): - # Setup - scheduler = BackgroundScheduler() - scheduler.start() - web_app.state.scheduler = scheduler - web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY")) - - -@web_app.on_event("shutdown") -async def shutdown_event(): - # Cleanup - if hasattr(web_app.state, "ably_client"): - web_app.state.ably_client.close() - if hasattr(web_app.state, "scheduler"): - web_app.state.scheduler.shutdown() - +scheduler = BackgroundScheduler() +scheduler.start() api_key_header = APIKeyHeader(name="X-Api-Key", auto_error=False) bearer_scheme = HTTPBearer(auto_error=False) @@ -103,25 +122,32 @@ async def stream_chat( return await handle_stream_chat(request, background_tasks) -@web_app.post("/deployment") -async def deployment( - request: DeployRequest, _: dict = Depends(auth.authenticate_admin) +@web_app.post("/deployments/create") +async def deployment_create( + request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin) ): - return await handle_deployment(request) + return await handle_deployment_create(request) -@web_app.post("/schedule") -async def schedule( - request: ScheduleRequest, - background_tasks: BackgroundTasks, - _: dict = Depends(auth.authenticate_admin), +@web_app.post("/deployments/delete") +async def deployment_delete( + request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin) ): - return await handle_schedule( - request=request, - background_tasks=background_tasks, - scheduler=web_app.state.scheduler, - ably_client=web_app.state.ably_client, - ) + return await handle_deployment_delete(request) + + +@web_app.post("/triggers/create") +async def trigger_create( + request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_create(request, scheduler, web_app.state.ably_client) + + +@web_app.post("/triggers/delete") +async def trigger_delete( + request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin) +): + return await handle_trigger_delete(request, scheduler) # Modal app setup diff --git a/eve/api/handlers.py b/eve/api/handlers.py index 1885280..b7dc6b2 100644 --- a/eve/api/handlers.py +++ b/eve/api/handlers.py @@ -3,13 +3,11 @@ import os import time import traceback -import asyncio from bson import ObjectId from fastapi import BackgroundTasks from fastapi.responses import StreamingResponse from ably import AblyRealtime from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.cron import CronTrigger from eve.api.requests import ( CancelRequest, @@ -26,7 +24,7 @@ serialize_for_json, setup_chat, ) -from eve.trigger import Cron, Trigger +from eve.trigger import Trigger from eve.deploy import ( create_modal_secrets, deploy_client, @@ -36,7 +34,6 @@ from eve.mongo import serialize_document from eve.task import Task from eve.tool import Tool -from eve.thread import UserMessage logger = logging.getLogger(__name__) db = os.getenv("DB", "STAGE").upper() @@ -189,60 +186,20 @@ async def handle_trigger_create( ably_client: AblyRealtime, ): try: - trigger = CronTrigger(**request.schedule.to_cron_dict()) - - def run_scheduled_task(): - logger.info(f"Running scheduled chat for user {request.user_id}") - - # Create new event loop for this thread - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - # Create new background tasks for this run - background_tasks = BackgroundTasks() - - # Create the chat request - chat_request = ChatRequest( - user_id=request.user_id, - agent_id=request.agent_id, - user_message=UserMessage( - content=request.message, - ), - update_config=request.update_config, - force_reply=True, - ) - - # Run the async chat handler and its background tasks - result = loop.run_until_complete( - handle_chat( - request=chat_request, - background_tasks=background_tasks, - ably_client=ably_client, - ) - ) - - # Execute any background tasks that were created - loop.run_until_complete(background_tasks()) - - logger.info( - f"Completed scheduled chat for user {request.user_id}: {result}" - ) - - except Exception as e: - logger.error( - f"Error in scheduled chat: {str(e)}\n{traceback.format_exc()}" - ) - finally: - loop.close() - - trigger_id = f"{request.user_id}_{request.agent_id}_{time.time()}" - job = scheduler.add_job( - run_scheduled_task, - trigger=trigger, - id=trigger_id, - misfire_grace_time=None, - coalesce=True, + from eve.trigger import create_chat_trigger + + trigger_id = f"{request.user_id}_{request.agent_id}_{int(time.time())}" + + job = await create_chat_trigger( + user_id=request.user_id, + agent_id=request.agent_id, + message=request.message, + schedule=request.schedule.to_cron_dict(), + update_config=request.update_config, + scheduler=scheduler, + ably_client=ably_client, + trigger_id=trigger_id, + handle_chat_fn=handle_chat, ) trigger = Trigger( @@ -272,9 +229,9 @@ async def handle_trigger_delete( request: DeleteTriggerRequest, scheduler: BackgroundScheduler ): try: - cron = Cron.from_mongo(request.id) - scheduler.remove_job(cron.job_id) - cron.delete() - return {"status": "success", "message": f"Deleted job {request.job_id}"} + trigger = Trigger.from_mongo(request.id) + scheduler.remove_job(trigger.trigger_id) + trigger.delete() + return {"status": "success", "message": f"Deleted job {trigger.trigger_id}"} except Exception as e: return {"status": "error", "message": str(e)} diff --git a/eve/api/helpers.py b/eve/api/helpers.py index 75cc1f1..5f50d15 100644 --- a/eve/api/helpers.py +++ b/eve/api/helpers.py @@ -1,4 +1,3 @@ -import asyncio import logging import os from typing import Optional @@ -6,6 +5,8 @@ from bson import ObjectId from fastapi import BackgroundTasks from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +import traceback from eve.tool import Tool from eve.user import User @@ -13,6 +14,8 @@ from eve.thread import Thread from eve.llm import async_title_thread from eve.api.requests import ChatRequest, UpdateConfig +from eve.trigger import Trigger +from eve.mongo import get_collection logger = logging.getLogger(__name__) @@ -84,3 +87,38 @@ async def emit_channel_update(update_channel: AblyRealtime, data: dict): await update_channel.publish("update", data) except Exception as e: logger.error(f"Failed to publish to Ably: {str(e)}") + + +async def load_existing_triggers( + scheduler: BackgroundScheduler, ably_client: AblyRealtime, handle_chat_fn +): + """Load all existing triggers from the database and add them to the scheduler""" + from eve.trigger import create_chat_trigger + + triggers_collection = get_collection(Trigger.collection_name) + + for trigger_doc in triggers_collection.find({}): + try: + # Convert mongo doc to Trigger object + trigger = Trigger.convert_from_mongo(trigger_doc) + trigger = Trigger.from_schema(trigger) + + await create_chat_trigger( + user_id=str(trigger.user), + agent_id=str(trigger.agent), + message=trigger.message, + schedule=trigger.schedule, + update_config=UpdateConfig(**trigger.update_config) + if trigger.update_config + else None, + scheduler=scheduler, + ably_client=ably_client, + trigger_id=trigger.trigger_id, + handle_chat_fn=handle_chat_fn, + ) + logger.info(f"Loaded trigger {trigger.trigger_id}") + + except Exception as e: + logger.error( + f"Error loading trigger {trigger_doc.get('trigger_id', 'unknown')}: {str(e)}\n{traceback.format_exc()}" + ) diff --git a/eve/trigger.py b/eve/trigger.py index 1c9e450..6fe7f9a 100644 --- a/eve/trigger.py +++ b/eve/trigger.py @@ -1,6 +1,18 @@ from typing import Dict, Any from bson import ObjectId from eve.mongo import Collection, Document +import asyncio +import logging +from typing import Optional +from fastapi import BackgroundTasks +from ably import AblyRealtime +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + +from eve.api.requests import ChatRequest, UpdateConfig +from eve.thread import UserMessage + +logger = logging.getLogger(__name__) @Collection("triggers") @@ -18,3 +30,58 @@ def __init__(self, **data): if isinstance(data.get("agent"), str): data["agent"] = ObjectId(data["agent"]) super().__init__(**data) + + +async def create_chat_trigger( + user_id: str, + agent_id: str, + message: str, + schedule: dict, + update_config: Optional[UpdateConfig], + scheduler: BackgroundScheduler, + ably_client: AblyRealtime, + trigger_id: str, + handle_chat_fn, +): + """Creates and adds a scheduled chat job to the scheduler""" + + def run_scheduled_task(): + logger.info(f"Running scheduled chat for user {user_id}") + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + background_tasks = BackgroundTasks() + chat_request = ChatRequest( + user_id=user_id, + agent_id=agent_id, + user_message=UserMessage(content=message), + update_config=update_config, + force_reply=True, + ) + + result = loop.run_until_complete( + handle_chat_fn( + request=chat_request, + background_tasks=background_tasks, + ably_client=ably_client, + ) + ) + loop.run_until_complete(background_tasks()) + logger.info(f"Completed scheduled chat for trigger {trigger_id}: {result}") + + except Exception as e: + logger.error(f"Error in scheduled chat: {str(e)}") + finally: + loop.close() + + job = scheduler.add_job( + run_scheduled_task, + trigger=CronTrigger(**schedule), + id=trigger_id, + misfire_grace_time=None, + coalesce=True, + ) + + return job