Skip to content

Commit

Permalink
add trigger model
Browse files Browse the repository at this point in the history
  • Loading branch information
jmilldotdev committed Jan 7, 2025
1 parent 5d31396 commit fb8deb1
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 40 deletions.
44 changes: 34 additions & 10 deletions eve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
handle_cancel,
handle_chat,
handle_create,
handle_deployment,
handle_schedule,
handle_deployment_create,
handle_deployment_delete,
handle_schedule_create,
handle_schedule_delete,
handle_stream_chat,
handle_trigger_create,
handle_trigger_delete,
)
from eve.api.requests import (
CancelRequest,
ChatRequest,
CreateDeploymentRequest,
CreateScheduleRequest,
CreateTriggerRequest,
DeleteDeploymentRequest,
DeleteScheduleRequest,
DeleteTriggerRequest,
ScheduleRequest,
TaskRequest,
DeployRequest,
Expand Down Expand Up @@ -115,18 +125,32 @@ async def stream_chat(
return await handle_stream_chat(request, background_tasks)


@web_app.post("/deployment")
async def deployment(
request: DeployRequest, _: dict = Depends(auth.authenticate_admin)
@web_app.post("/deployments/create")
async def deployment_create(
request: CreateDeploymentRequest, _: dict = Depends(auth.authenticate_admin)
):
return await handle_deployment(request)
return await handle_deployment_create(request)


@web_app.post("/schedule")
async def schedule(
request: ScheduleRequest, _: dict = Depends(auth.authenticate_admin)
@web_app.post("/deployments/delete")
async def deployment_delete(
request: DeleteDeploymentRequest, _: dict = Depends(auth.authenticate_admin)
):
return await handle_schedule(request)
return await handle_deployment_delete(request)


@web_app.post("/triggers/create")
async def trigger_create(
request: CreateTriggerRequest, _: dict = Depends(auth.authenticate_admin)
):
return await handle_trigger_create(request, scheduler, web_app.state.ably_client)


@web_app.post("/trigger/delete")
async def trigger_delete(
request: DeleteTriggerRequest, _: dict = Depends(auth.authenticate_admin)
):
return await handle_trigger_delete(request, scheduler)


# Modal app setup
Expand Down
66 changes: 47 additions & 19 deletions eve/api/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import traceback
import asyncio
from bson import ObjectId
from fastapi import BackgroundTasks
from fastapi.responses import StreamingResponse
from ably import AblyRealtime
Expand All @@ -13,8 +14,10 @@
from eve.api.requests import (
CancelRequest,
ChatRequest,
DeployRequest,
ScheduleRequest,
CreateDeploymentRequest,
CreateTriggerRequest,
DeleteDeploymentRequest,
DeleteTriggerRequest,
TaskRequest,
)
from eve.api.helpers import (
Expand All @@ -23,8 +26,8 @@
serialize_for_json,
setup_chat,
)
from eve.trigger import Cron, Trigger
from eve.deploy import (
DeployCommand,
create_modal_secrets,
deploy_client,
stop_client,
Expand Down Expand Up @@ -153,35 +156,35 @@ async def event_generator():
return {"status": "error", "message": str(e)}


async def handle_deployment(request: DeployRequest):
async def handle_deployment_create(request: CreateDeploymentRequest):
try:
if request.credentials:
create_modal_secrets(
request.credentials,
f"{request.agent_key}-secrets-{db}",
)

if request.command == DeployCommand.DEPLOY:
deploy_client(request.agent_key, request.platform.value)
return {
"status": "success",
"message": f"Deployed {request.platform.value} client",
}
elif request.command == DeployCommand.STOP:
stop_client(request.agent_key, request.platform.value)
return {
"status": "success",
"message": f"Stopped {request.platform.value} client",
}
else:
raise Exception("Invalid command")
except Exception as e:
return {"status": "error", "message": str(e)}


async def handle_deployment_delete(request: DeleteDeploymentRequest):
try:
stop_client(request.agent_key, request.platform.value)
return {
"status": "success",
"message": f"Stopped {request.platform.value} client",
}
except Exception as e:
return {"status": "error", "message": str(e)}


async def handle_schedule(
request: ScheduleRequest,
async def handle_trigger_create(
request: CreateTriggerRequest,
scheduler: BackgroundScheduler,
ably_client: AblyRealtime,
):
Expand All @@ -204,7 +207,7 @@ def run_scheduled_task():
user_id=request.user_id,
agent_id=request.agent_id,
user_message=UserMessage(
content=request.message_content,
content=request.message,
),
update_config=request.update_config,
force_reply=True,
Expand Down Expand Up @@ -233,20 +236,45 @@ def run_scheduled_task():
finally:
loop.close()

trigger_id = f"{request.user_id}_{request.agent_id}_{time.time()}"
job = scheduler.add_job(
run_scheduled_task,
trigger=trigger,
id=f"{request.user_id}_{request.agent_id}_{time.time()}",
id=trigger_id,
misfire_grace_time=None,
coalesce=True,
)

trigger = Trigger(
trigger_id=trigger_id,
user=ObjectId(request.user_id),
agent=ObjectId(request.agent_id),
schedule=request.schedule.to_cron_dict(),
message=request.message,
update_config=request.update_config.model_dump()
if request.update_config
else {},
)
trigger.save()

return {
"status": "success",
"id": str(trigger.id),
"job_id": job.id,
"next_run_time": str(job.next_run_time),
}

except Exception as e:
logger.error(f"Error scheduling task: {str(e)}\n{traceback.format_exc()}")
return {"status": "error", "message": str(e)}


async def handle_trigger_delete(
request: DeleteTriggerRequest, scheduler: BackgroundScheduler
):
try:
cron = Cron.from_mongo(request.id)
scheduler.remove_job(cron.job_id)
cron.delete()
return {"status": "success", "message": f"Deleted job {request.job_id}"}
except Exception as e:
return {"status": "error", "message": str(e)}
17 changes: 12 additions & 5 deletions eve/api/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pydantic import BaseModel, ConfigDict, Field
from datetime import datetime

from eve.deploy import DeployCommand
from eve.models import ClientType
from eve.thread import UserMessage

Expand Down Expand Up @@ -64,16 +63,24 @@ def to_cron_dict(self) -> dict:
return {k: v for k, v in self.model_dump().items() if v is not None}


class ScheduleRequest(BaseModel):
class CreateTriggerRequest(BaseModel):
agent_id: str
user_id: str
message_content: str
message: str
schedule: CronSchedule
update_config: Optional[UpdateConfig] = None


class DeployRequest(BaseModel):
class DeleteTriggerRequest(BaseModel):
id: str


class CreateDeploymentRequest(BaseModel):
agent_key: str
platform: ClientType
command: DeployCommand
credentials: Optional[Dict[str, str]] = None


class DeleteDeploymentRequest(BaseModel):
agent_key: str
platform: ClientType
6 changes: 0 additions & 6 deletions eve/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
import subprocess
import tempfile
from enum import Enum
from typing import Dict


Expand All @@ -13,11 +12,6 @@
env = "prod" if db == "PROD" else "stage"


class DeployCommand(str, Enum):
DEPLOY = "deploy"
STOP = "stop"


def authenticate_modal_key() -> bool:
token_id = os.getenv("MODAL_DEPLOYER_TOKEN_ID")
token_secret = os.getenv("MODAL_DEPLOYER_TOKEN_SECRET")
Expand Down
20 changes: 20 additions & 0 deletions eve/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Dict, Any
from bson import ObjectId
from eve.mongo import Collection, Document


@Collection("triggers")
class Trigger(Document):
trigger_id: str
user: ObjectId
agent: ObjectId
schedule: Dict[str, Any]
message: str
update_config: Dict[str, Any]

def __init__(self, **data):
if isinstance(data.get("user"), str):
data["user"] = ObjectId(data["user"])
if isinstance(data.get("agent"), str):
data["agent"] = ObjectId(data["agent"])
super().__init__(**data)

0 comments on commit fb8deb1

Please sign in to comment.