Skip to content

Commit

Permalink
Add ably heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
jmilldotdev committed Jan 7, 2025
1 parent 1aa3777 commit 5d31396
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
43 changes: 32 additions & 11 deletions eve/api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging
import os
import threading
import modal
from fastapi import FastAPI, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import APIKeyHeader, HTTPBearer
import logging
from ably import AblyRealtime
from apscheduler.schedulers.background import BackgroundScheduler
from pathlib import Path
from contextlib import asynccontextmanager

from eve import auth
from eve.api.handlers import (
Expand All @@ -33,17 +34,44 @@
from eve import deploy
from eve.tools.comfyui_tool import convert_tasks2_to_tasks3

# Config and logging setup

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.getLogger("ably").setLevel(logging.WARNING)


db = os.getenv("DB", "STAGE").upper()
if db not in ["PROD", "STAGE"]:
raise Exception(f"Invalid environment: {db}. Must be PROD or STAGE")
app_name = "api-prod" if db == "PROD" else "api-stage"


# FastAPI setup
web_app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True)
watch_thread.start()
app.state.watch_thread = watch_thread

app.state.ably_client = AblyRealtime(
os.getenv("ABLY_PUBLISHER_KEY"),
options={
"heartbeat_interval": 15000,
"connection_state_ttl": 60000,
"disconnected_retry_timeout": 15000,
},
)
yield
# Shutdown
if hasattr(app.state, "watch_thread"):
app.state.watch_thread.join(timeout=5)
if hasattr(app.state, "scheduler"):
app.state.scheduler.shutdown(wait=True)
if hasattr(app.state, "ably_client"):
await app.state.ably_client.close()


web_app = FastAPI(lifespan=lifespan)
web_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
Expand All @@ -59,13 +87,6 @@
background_tasks: BackgroundTasks = BackgroundTasks()


@web_app.on_event("startup")
async def startup_event():
watch_thread = threading.Thread(target=convert_tasks2_to_tasks3, daemon=True)
watch_thread.start()
web_app.state.ably_client = AblyRealtime(os.getenv("ABLY_PUBLISHER_KEY"))


@web_app.post("/create")
async def create(request: TaskRequest, _: dict = Depends(auth.authenticate_admin)):
return await handle_create(request)
Expand Down
2 changes: 2 additions & 0 deletions eve/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ async def deployment(
@web_app.post("/schedule")
async def schedule(
request: ScheduleRequest,
background_tasks: BackgroundTasks,
_: dict = Depends(auth.authenticate_admin),
):
return await handle_schedule(
request=request,
background_tasks=background_tasks,
scheduler=web_app.state.scheduler,
ably_client=web_app.state.ably_client,
)
Expand Down
11 changes: 4 additions & 7 deletions eve/clients/discord/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
if local:
self.api_url = "http://localhost:8000"
else:
self.api_url = os.getenv(f"EDEN_API_URL")
self.api_url = os.getenv("EDEN_API_URL")
self.channel_name = common.get_ably_channel_name(
agent.username, ClientType.DISCORD
)
Expand Down Expand Up @@ -169,16 +169,13 @@ async def on_message(self, message: discord.Message) -> None:

# Lookup thread
if thread_key not in self.known_threads:
self.known_threads[thread_key] = self.agent.request_thread(
key=thread_key
)
self.known_threads[thread_key] = self.agent.request_thread(key=thread_key)
thread = self.known_threads[thread_key]

# Lookup user
if message.author.id not in self.known_users:
self.known_users[message.author.id] = User.from_discord(
message.author.id,
message.author.name
message.author.id, message.author.name
)
user = self.known_users[message.author.id]

Expand Down Expand Up @@ -335,6 +332,6 @@ def start(
)
parser.add_argument("--local", help="Run locally", action="store_true")
args = parser.parse_args()

load_env(args.db)
start(args.env, args.agent, args.local)

0 comments on commit 5d31396

Please sign in to comment.