Skip to content

Commit

Permalink
postprocessing task
Browse files Browse the repository at this point in the history
  • Loading branch information
genekogan committed Jan 8, 2025
2 parents 93afed9 + 6fd81f2 commit 4974c30
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 325 deletions.
6 changes: 3 additions & 3 deletions comfyui.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ def test_workflows(self):
tests = [f"/root/workspace/workflows/{workflow}/{specific_test}"]
else:
tests = [f"/root/workspace/workflows/{workflow}/test.json"]
print("\n\n-----------------------------------------------------------")
print(f"====> Running tests for {workflow}: ", tests)
for test in tests:
for i, test in enumerate(tests):
print(f"\n\n\n------------------ Running test {i+1} of {len(tests)} ------------------")
tool = Tool.from_yaml(f"/root/workspace/workflows/{workflow}/api.yaml")
if tool.status == "inactive":
print(f"{workflow} is inactive, skipping test")
Expand Down Expand Up @@ -698,7 +698,7 @@ def validate_url(url):
#print("found lora:\n", lora)

if not lora:
raise Exception(f"Lora {key} with id: {lora_id} not found!")
raise Exception(f"Lora {key} with id: {lora_id} not found in DB {db}!")

base_model = lora.get("base_model")
lora_url = lora.get("checkpoint")
Expand Down
11 changes: 9 additions & 2 deletions eve/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import sentry_sdk
from dotenv import load_dotenv
from pathlib import Path
Expand All @@ -6,9 +7,12 @@

home_dir = str(Path.home())


EDEN_API_KEY = None

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)


def load_env(db):
global EDEN_API_KEY
Expand Down Expand Up @@ -37,7 +41,10 @@ def load_env(db):

# start sentry
sentry_dsn = os.getenv("SENTRY_DSN")
sentry_sdk.init(dsn=sentry_dsn, traces_sample_rate=1.0, profiles_sample_rate=1.0)
if sentry_dsn:
sentry_sdk.init(
dsn=sentry_dsn, traces_sample_rate=1.0, profiles_sample_rate=1.0
)

# load api keys
EDEN_API_KEY = SecretStr(os.getenv("EDEN_API_KEY", ""))
Expand Down
38 changes: 38 additions & 0 deletions eve/api/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import functools
import logging
import sentry_sdk
from fastapi import HTTPException
from typing import Callable, Any

logger = logging.getLogger(__name__)


class APIError(HTTPException):
def __init__(self, message: str, status_code: int = 400):
super().__init__(status_code=status_code, detail=message)


def handle_errors(func: Callable) -> Callable:
"""
Decorator that handles errors consistently across API endpoints.
Logs errors, reports to Sentry, and returns appropriate responses.
"""

@functools.wraps(func)
async def wrapper(*args, **kwargs) -> Any:
try:
result = await func(*args, **kwargs)
return result

except APIError as e:
# Log API errors but don't send to Sentry as these are expected
logger.error(f"API Error in {func.__name__}: {str(e)}", exc_info=True)
raise

except Exception as e:
# Log unexpected errors and send to Sentry
logger.error(f"Unexpected error in {func.__name__}", exc_info=True)
sentry_sdk.capture_exception(e)
raise APIError(f"Operation failed: {str(e)}")

return wrapper
190 changes: 92 additions & 98 deletions eve/api/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
import logging
import os
import time
import traceback
import replicate
from bson import ObjectId
from fastapi import BackgroundTasks, Request
from fastapi.responses import StreamingResponse
from ably import AblyRealtime
from apscheduler.schedulers.background import BackgroundScheduler

from eve.api.errors import handle_errors, APIError
from eve.api.api_requests import (
CancelRequest,
ChatRequest,
Expand Down Expand Up @@ -42,6 +41,7 @@
env = "prod" if db == "PROD" else "stage"


@handle_errors
async def handle_create(request: TaskRequest):
tool = Tool.load(key=request.tool)
result = await tool.async_start_task(
Expand All @@ -50,11 +50,17 @@ async def handle_create(request: TaskRequest):
return serialize_document(result.model_dump(by_alias=True))


@handle_errors
async def handle_cancel(request: CancelRequest):
task = Task.from_mongo(request.task_id)
assert str(task.user) == request.user_id, "Task user does not match user_id"
if str(task.user) != request.user_id:
raise APIError(
"Unauthorized: Task user does not match user_id", status_code=403
)

if task.status in ["completed", "failed", "cancelled"]:
return {"status": task.status}

tool = Tool.load(key=task.tool)
await tool.async_cancel(task)
return {"status": task.status}
Expand All @@ -77,16 +83,15 @@ async def handle_chat(
background_tasks: BackgroundTasks,
ably_client: AblyRealtime
):
try:
user, agent, thread, tools = await setup_chat(request, background_tasks)

update_channel = (
await get_update_channel(request.update_config, ably_client)
if request.update_config and request.update_config.sub_channel_name
else None
)
user, agent, thread, tools = await setup_chat(request, background_tasks)
update_channel = (
await get_update_channel(request.update_config, ably_client)
if request.update_config and request.update_config.sub_channel_name
else None
)

async def run_prompt():
async def run_prompt():
try:
async for update in async_prompt_thread(
user=user,
agent=agent,
Expand All @@ -113,20 +118,24 @@ async def run_prompt():
data["error"] = update.error if hasattr(update, "error") else None

await emit_update(request.update_config, update_channel, data)
except Exception as e:
logger.error("Error in run_prompt", exc_info=True)
await emit_update(
request.update_config,
update_channel,
{"type": "error", "error": str(e)},
)

background_tasks.add_task(run_prompt)
return {"status": "success", "thread_id": str(thread.id)}

except Exception as e:
logger.error(f"Error in handle_chat: {str(e)}\n{traceback.format_exc()}")
return {"status": "error", "message": str(e)}
background_tasks.add_task(run_prompt)
return {"thread_id": str(thread.id)}


@handle_errors
async def handle_stream_chat(request: ChatRequest, background_tasks: BackgroundTasks):
try:
user, agent, thread, tools = await setup_chat(request, background_tasks)
user, agent, thread, tools = await setup_chat(request, background_tasks)

async def event_generator():
async def event_generator():
try:
async for update in async_prompt_thread(
user=user,
agent=agent,
Expand Down Expand Up @@ -156,100 +165,85 @@ async def event_generator():
yield f"data: {json.dumps({'event': 'update', 'data': data})}\n\n"

yield f"data: {json.dumps({'event': 'done', 'data': ''})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)

except Exception as e:
logger.error(f"Error in stream_chat: {str(e)}\n{traceback.format_exc()}")
return {"status": "error", "message": str(e)}
except Exception as e:
logger.error("Error in event_generator", exc_info=True)
yield f"data: {json.dumps({'event': 'error', 'data': {'error': str(e)}})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)


@handle_errors
async def handle_deployment_create(request: CreateDeploymentRequest):
try:
if request.credentials:
create_modal_secrets(
request.credentials,
f"{request.agent_key}-secrets-{env}",
)
deploy_client(request.agent_key, request.platform.value, env)
return {
"status": "success",
"message": f"Deployed {request.platform.value} client",
}
except Exception as e:
return {"status": "error", "message": str(e)}
if not request.credentials:
raise APIError("Credentials are required", status_code=400)

create_modal_secrets(
request.credentials,
f"{request.agent_key}-secrets-{env}",
)
deploy_client(request.agent_key, request.platform.value, env)
return {"message": f"Deployed {request.platform.value} client"}


@handle_errors
async def handle_deployment_delete(request: DeleteDeploymentRequest):
try:
stop_client(request.agent_key, request.platform.value)
return {
"status": "success",
"message": f"Stopped {request.platform.value} client",
}
except Exception as e:
return {"status": "error", "message": str(e)}
stop_client(request.agent_key, request.platform.value)
return {"message": f"Stopped {request.platform.value} client"}


@handle_errors
async def handle_trigger_create(
request: CreateTriggerRequest,
scheduler: BackgroundScheduler,
ably_client: AblyRealtime,
):
try:
from eve.trigger import create_chat_trigger

trigger_id = f"{request.user_id}_{request.agent_id}_{int(time.time())}"

job = await create_chat_trigger(
user_id=request.user_id,
agent_id=request.agent_id,
message=request.message,
schedule=request.schedule.to_cron_dict(),
update_config=request.update_config,
scheduler=scheduler,
ably_client=ably_client,
trigger_id=trigger_id,
handle_chat_fn=handle_chat,
)

trigger = Trigger(
trigger_id=trigger_id,
user=ObjectId(request.user_id),
agent=ObjectId(request.agent_id),
schedule=request.schedule.to_cron_dict(),
message=request.message,
update_config=request.update_config.model_dump()
if request.update_config
else {},
)
trigger.save()
from eve.trigger import create_chat_trigger

trigger_id = f"{request.user_id}_{request.agent_id}_{int(time.time())}"

job = await create_chat_trigger(
user_id=request.user_id,
agent_id=request.agent_id,
message=request.message,
schedule=request.schedule.to_cron_dict(),
update_config=request.update_config,
scheduler=scheduler,
ably_client=ably_client,
trigger_id=trigger_id,
handle_chat_fn=handle_chat,
)

return {
"id": str(trigger.id),
"job_id": job.id,
"next_run_time": str(job.next_run_time),
}
trigger = Trigger(
trigger_id=trigger_id,
user=ObjectId(request.user_id),
agent=ObjectId(request.agent_id),
schedule=request.schedule.to_cron_dict(),
message=request.message,
update_config=request.update_config.model_dump()
if request.update_config
else {},
)
trigger.save()

except Exception as e:
logger.error(f"Error scheduling task: {str(e)}\n{traceback.format_exc()}")
return {"status": "error", "message": str(e)}
return {
"id": str(trigger.id),
"job_id": job.id,
"next_run_time": str(job.next_run_time),
}


@handle_errors
async def handle_trigger_delete(
request: DeleteTriggerRequest, scheduler: BackgroundScheduler
):
try:
trigger = Trigger.from_mongo(request.id)
scheduler.remove_job(trigger.trigger_id)
trigger.delete()
return {"status": "success", "message": f"Deleted job {trigger.trigger_id}"}
except Exception as e:
return {"status": "error", "message": str(e)}
trigger = Trigger.from_mongo(request.id)
scheduler.remove_job(trigger.trigger_id)
trigger.delete()
return {"message": f"Deleted job {trigger.trigger_id}"}
Loading

0 comments on commit 4974c30

Please sign in to comment.