diff --git a/.gitignore b/.gitignore index 3ffdc5c..0129f94 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ __pycache__/ *.pyc *.pyo .python-version +api/api.egg-info/* # Visual Studio Code .vscode/ diff --git a/agent/agent/chain.py b/agent/agent/chain.py index 6c328af..8e3542c 100644 --- a/agent/agent/chain.py +++ b/agent/agent/chain.py @@ -40,7 +40,7 @@ def template(self) -> dict[str, str]: system = ( { "role": "system", - "content": f"""You are Bloom, a subversive-minded learning companion. Your job is to employ your theory of mind skills to predict the user’s mental state. + "content": f"""You are Bloom, a subversive-minded learning companion. Your job is to employ your theory of mind skills to predict the user's mental state. Generate a thought that makes a prediction about the user's needs given current dialogue and also lists other pieces of data that would help improve your prediction previous commentary: {self.history}""", }, diff --git a/api/pyproject.toml b/api/pyproject.toml index 44874d8..1f268ec 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -2,9 +2,7 @@ name = "api" version = "0.6.0" description = "The REST API Implementation of Tutor-GPT" -authors = [ - {name = "Plastic Labs", email = "hello@plasticlabs.ai"}, -] +authors = [{ name = "Plastic Labs", email = "hello@plasticlabs.ai" }] requires-python = ">=3.11" dependencies = [ "fastapi[standard]>=0.112.2", @@ -16,4 +14,4 @@ dependencies = [ [tool.uv.sources] # agent = { path = "../agent", editable = true } -agent = {workspace=true} +agent = { workspace = true } diff --git a/api/routers/chat.py b/api/routers/chat.py index 1e00415..55a385e 100644 --- a/api/routers/chat.py +++ b/api/routers/chat.py @@ -1,128 +1,130 @@ -from typing import Optional -from fastapi import APIRouter, HTTPException, Body -from fastapi.responses import StreamingResponse -from pydantic import BaseModel +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse, JSONResponse from api import schemas from api.dependencies import app, honcho from agent.chain import ThinkCall, RespondCall +import logging + router = APIRouter(prefix="/api", tags=["chat"]) @router.post("/stream") -async def stream( - inp: schemas.ConversationInput, +async def stream(inp: schemas.ConversationInput): + try: + user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) + + async def convo_turn(): + thought = "" + response = "" + try: + thought_stream = ThinkCall( + user_input=inp.message, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho, + ).stream() + for chunk in thought_stream: + thought += chunk + yield chunk + + yield "❀" + response_stream = RespondCall( + user_input=inp.message, + thought=thought, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho, + ).stream() + for chunk in response_stream: + response += chunk + yield chunk + yield "❀" + except Exception as e: + logging.error(f"Error during streaming: {str(e)}") + yield f"Error: {str(e)}" + return + + await create_messages_and_metamessages( + app.id, user.id, inp.conversation_id, inp.message, thought, response + ) + + return StreamingResponse(convo_turn()) + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + if "rate limit" in str(e).lower(): + return JSONResponse( + status_code=429, + content={ + "error": "rate_limit_exceeded", + "message": "Rate limit exceeded. Please try again later.", + }, + ) + else: + return JSONResponse( + status_code=500, + content={ + "error": "internal_server_error", + "message": "An internal server error has occurred.", + }, + ) + + +async def create_messages_and_metamessages( + app_id, user_id, conversation_id, user_message, thought, ai_response ): - """Stream the response too the user, currently only used by the Web UI and has integration to be able to use Honcho is not anonymous""" - user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) - - def convo_turn(): - thought_stream = ThinkCall( - user_input=inp.message, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() - thought = "" - for chunk in thought_stream: - thought += chunk - yield chunk - - yield "❀" - response_stream = RespondCall( - user_input=inp.message, - thought=thought, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() - response = "" - for chunk in response_stream: - response += chunk - yield chunk - yield "❀" - - honcho.apps.users.sessions.messages.create( + try: + # These operations will use the DB layer's built-in retry logic + await honcho.apps.users.sessions.messages.create( is_user=True, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=inp.message, + session_id=str(conversation_id), + app_id=app_id, + user_id=user_id, + content=user_message, ) - new_ai_message = honcho.apps.users.sessions.messages.create( + new_ai_message = await honcho.apps.users.sessions.messages.create( is_user=False, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=response, + session_id=str(conversation_id), + app_id=app_id, + user_id=user_id, + content=ai_response, ) - honcho.apps.users.sessions.metamessages.create( - app_id=app.id, - session_id=str(inp.conversation_id), - user_id=user.id, + await honcho.apps.users.sessions.metamessages.create( + app_id=app_id, + session_id=str(conversation_id), + user_id=user_id, message_id=new_ai_message.id, metamessage_type="thought", content=thought, ) - return StreamingResponse(convo_turn()) + except Exception as e: + logging.error(f"Error in create_messages_and_metamessages: {str(e)}") + raise # Re-raise the exception to be handled by the caller + @router.get("/thought/{message_id}") async def get_thought(conversation_id: str, message_id: str, user_id: str): - user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) - thought = honcho.apps.users.sessions.metamessages.list( - session_id=conversation_id, - app_id=app.id, - user_id=user.id, - message_id=message_id, - metamessage_type="thought" - ) - # In practice, there should only be one thought per message - return {"thought": thought.items[0].content if thought.items else None} - -class ReactionBody(BaseModel): - reaction: Optional[str] = None - -@router.post("/reaction/{message_id}") -async def add_or_remove_reaction( - conversation_id: str, - message_id: str, - user_id: str, - body: ReactionBody -): - reaction = body.reaction - - if reaction is not None and reaction not in ["thumbs_up", "thumbs_down"]: - raise HTTPException(status_code=400, detail="Invalid reaction type") - - user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) - - message = honcho.apps.users.sessions.messages.get( - app_id=app.id, - session_id=conversation_id, - user_id=user.id, - message_id=message_id - ) - - if not message: - raise HTTPException(status_code=404, detail="Message not found") - - metadata = message.metadata or {} - - if reaction is None: - metadata.pop('reaction', None) - else: - metadata['reaction'] = reaction - - honcho.apps.users.sessions.messages.update( - app_id=app.id, - session_id=conversation_id, - user_id=user.id, - message_id=message_id, - metadata=metadata - ) - - return {"status": "Reaction updated successfully"} + try: + user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) + thought = honcho.apps.users.sessions.metamessages.list( + session_id=conversation_id, + app_id=app.id, + user_id=user.id, + message_id=message_id, + metamessage_type="thought", + ) + # In practice, there should only be one thought per message + return {"thought": thought.items[0].content if thought.items else None} + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + return JSONResponse( + status_code=500, + content={ + "error": "internal_server_error", + "message": "An internal server error has occurred.", + }, + ) diff --git a/www/components/messagebox.tsx b/www/components/messagebox.tsx index fd02c82..bbe80f2 100644 --- a/www/components/messagebox.tsx +++ b/www/components/messagebox.tsx @@ -22,6 +22,8 @@ interface MessageBoxProps { onReactionAdded: (messageId: string, reaction: Reaction) => Promise; } +export type Reaction = "thumbs_up" | "thumbs_down" | null; + export default function MessageBox({ isUser, userId, diff --git a/www/package.json b/www/package.json index cc23247..52b26d4 100644 --- a/www/package.json +++ b/www/package.json @@ -26,6 +26,7 @@ "react-toggle-dark-mode": "^1.1.1", "rehype-katex": "^7.0.1", "remark-math": "^6.0.0", + "retry": "^0.13.1", "sharp": "^0.32.6", "stripe": "^16.12.0", "sweetalert2": "^11.14.2", @@ -38,6 +39,7 @@ "@types/react": "18.2.21", "@types/react-dom": "18.2.7", "@types/react-syntax-highlighter": "^15.5.13", + "@types/retry": "^0.12.5", "@types/uuid": "^9.0.8", "autoprefixer": "10.4.15", "encoding": "^0.1.13", diff --git a/www/pnpm-lock.yaml b/www/pnpm-lock.yaml index 10fad57..5148a55 100644 --- a/www/pnpm-lock.yaml +++ b/www/pnpm-lock.yaml @@ -55,6 +55,9 @@ importers: remark-math: specifier: ^6.0.0 version: 6.0.0 + retry: + specifier: ^0.13.1 + version: 0.13.1 sharp: specifier: ^0.32.6 version: 0.32.6 @@ -86,7 +89,10 @@ importers: "@types/react-syntax-highlighter": specifier: ^15.5.13 version: 15.5.13 - "@types/uuid": + '@types/retry': + specifier: ^0.12.5 + version: 0.12.5 + '@types/uuid': specifier: ^9.0.8 version: 9.0.8 autoprefixer: @@ -2631,11 +2637,11 @@ packages: integrity: sha512-neFKG/sBAwGxHgXiIxnbm3/AAVQ/cMRS93hvBpg8xYRbeQSPVABp9U2bRnPf0iI4+Ucdv3plSxKK+3CW2ENJxA==, } - "@types/scheduler@0.23.0": - resolution: - { - integrity: sha512-YIoDCTH3Af6XM5VuwGG/QL/CJqga1Zm3NkU3HZ4ZHK2fRMPYP1VczsTUqtsf43PH/iJNVlPHAo2oWX7BSdB2Hw==, - } + '@types/retry@0.12.5': + resolution: {integrity: sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw==} + + '@types/scheduler@0.23.0': + resolution: {integrity: sha512-YIoDCTH3Af6XM5VuwGG/QL/CJqga1Zm3NkU3HZ4ZHK2fRMPYP1VczsTUqtsf43PH/iJNVlPHAo2oWX7BSdB2Hw==} "@types/shimmer@1.2.0": resolution: @@ -7708,11 +7714,12 @@ packages: hasBin: true restore-cursor@3.1.0: - resolution: - { - integrity: sha512-l+sSefzHpj5qimhFSE5a8nufZYAM3sBSVMAPtYkmC+4EH2anSGaEMXSD0izRQbu9nfyQ9y5JrVmp7E8oZrUjvA==, - } - engines: { node: ">=8" } + resolution: {integrity: sha512-l+sSefzHpj5qimhFSE5a8nufZYAM3sBSVMAPtYkmC+4EH2anSGaEMXSD0izRQbu9nfyQ9y5JrVmp7E8oZrUjvA==} + engines: {node: '>=8'} + + retry@0.13.1: + resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==} + engines: {node: '>= 4'} reusify@1.0.4: resolution: @@ -11276,7 +11283,9 @@ snapshots: "@types/scheduler": 0.23.0 csstype: 3.1.3 - "@types/scheduler@0.23.0": {} + '@types/retry@0.12.5': {} + + '@types/scheduler@0.23.0': {} "@types/shimmer@1.2.0": {} @@ -14820,6 +14829,8 @@ snapshots: onetime: 5.1.2 signal-exit: 3.0.7 + retry@0.13.1: {} + reusify@1.0.4: {} rimraf@2.6.3: diff --git a/www/utils/api.ts b/www/utils/api.ts index 50b9a78..90e3be5 100644 --- a/www/utils/api.ts +++ b/www/utils/api.ts @@ -1,4 +1,5 @@ import { type Reaction } from "@/components/messagebox"; +import { retryDBOperation, retryOpenAIOperation } from "./retryUtils"; const defaultMessage: Message = { text: `I'm your Aristotelian learning companion — here to help you follow your curiosity in whatever direction you like. My engineering makes me extremely receptive to your needs and interests. You can reply normally, and I’ll always respond!\n\nIf I'm off track, just say so!\n\nNeed to leave or just done chatting? Let me know! I’m conversational by design so I’ll say goodbye 😊.`, @@ -33,65 +34,71 @@ export class Conversation { } async getMessages() { - const req = await fetch( - `${this.api.url}/api/messages?` + + return retryDBOperation(async () => { + const req = await fetch( + `${this.api.url}/api/messages?` + new URLSearchParams({ conversation_id: this.conversationId, user_id: this.api.userId, - }) - ); - const { messages: rawMessages } = await req.json(); - // console.log(rawMessages); - if (!rawMessages) return []; - const messages = rawMessages.map((rawMessage: any) => { - return { - text: rawMessage.data.content, - isUser: rawMessage.type === 'human', - id: rawMessage.id, - }; - }); + }), + ); + const { messages: rawMessages } = await req.json(); + if (!rawMessages) return []; + const messages = rawMessages.map((rawMessage: any) => { + return { + text: rawMessage.data.content, + isUser: rawMessage.type === "human", + id: rawMessage.id, + }; + }); - return messages; + return messages; + }); } async setName(name: string) { if (!name || name === this.name) return; - await fetch(`${this.api.url}/api/conversations/update`, { - method: 'POST', - body: JSON.stringify({ - conversation_id: this.conversationId, - user_id: this.api.userId, - name, - }), - headers: { - 'Content-Type': 'application/json', - }, + await retryDBOperation(async () => { + await fetch(`${this.api.url}/api/conversations/update`, { + method: "POST", + body: JSON.stringify({ + conversation_id: this.conversationId, + user_id: this.api.userId, + name, + }), + headers: { + "Content-Type": "application/json", + }, + }); + this.name = name; }); - this.name = name; } async delete() { - await fetch( - `${this.api.url}/api/conversations/delete?user_id=${this.api.userId}&conversation_id=${this.conversationId}` - ).then((res) => res.json()); + await retryDBOperation(async () => { + await fetch( + `${this.api.url}/api/conversations/delete?user_id=${this.api.userId}&conversation_id=${this.conversationId}`, + ).then((res) => res.json()); + }); } async chat(message: string) { - const req = await fetch(`${this.api.url}/api/stream`, { - method: 'POST', - body: JSON.stringify({ - conversation_id: this.conversationId, - user_id: this.api.userId, - message, - }), - headers: { - 'Content-Type': 'application/json', - }, - }); + return retryOpenAIOperation(async () => { + const req = await fetch(`${this.api.url}/api/stream`, { + method: "POST", + body: JSON.stringify({ + conversation_id: this.conversationId, + user_id: this.api.userId, + message, + }), + headers: { + "Content-Type": "application/json", + }, + }); - const reader = req.body?.pipeThrough(new TextDecoderStream()).getReader()!; - return reader; + return req.body?.pipeThrough(new TextDecoderStream()).getReader()!; + }); } } @@ -110,85 +117,158 @@ export class API { } async new() { - const req = await fetch( - `${this.url}/api/conversations/insert?user_id=${this.userId}` - ); - const { conversation_id } = await req.json(); - return new Conversation({ - api: this, - name: '', - conversationId: conversation_id, + return retryDBOperation(async () => { + const req = await fetch( + `${this.url}/api/conversations/insert?user_id=${this.userId}`, + ); + const { conversation_id } = await req.json(); + return new Conversation({ + api: this, + name: "", + conversationId: conversation_id, + }); }); } async getConversations() { - const req = await fetch( - `${this.url}/api/conversations/get?user_id=${this.userId}` - ); - const { conversations }: { conversations: RawConversation[] } = - await req.json(); - - if (conversations.length === 0) { - return [await this.new()]; - } - return conversations.map( - (conversation) => - new Conversation({ - api: this, - name: conversation.name, - conversationId: conversation.conversation_id, - }) - ); + return retryDBOperation(async () => { + const req = await fetch( + `${this.url}/api/conversations/get?user_id=${this.userId}`, + ); + const { conversations }: { conversations: RawConversation[] } = + await req.json(); + + if (conversations.length === 0) { + return [await this.new()]; + } + return conversations.map( + (conversation) => + new Conversation({ + api: this, + name: conversation.name, + conversationId: conversation.conversation_id, + }), + ); + }); } async getMessagesByConversation(conversationId: string) { - const req = await fetch( - `${this.url}/api/messages?` + + return retryDBOperation(async () => { + const req = await fetch( + `${this.url}/api/messages?` + new URLSearchParams({ conversation_id: conversationId, user_id: this.userId, - }) - ); - const { messages: rawMessages } = await req.json(); - // console.log(rawMessages); - if (!rawMessages) return []; - const messages: Message[] = rawMessages.map((rawMessage: any) => { - return { - text: rawMessage.content, - isUser: rawMessage.isUser, - id: rawMessage.id, - metadata: rawMessage.metadata, - }; - }); + }), + ); + const { messages: rawMessages } = await req.json(); + if (!rawMessages) return []; + const messages: Message[] = rawMessages.map((rawMessage: any) => { + return { + text: rawMessage.content, + isUser: rawMessage.isUser, + id: rawMessage.id, + }; + }); - return [defaultMessage, ...messages]; + return [defaultMessage, ...messages]; + }); } async getThoughtById( conversationId: string, messageId: string ): Promise { - try { - const response = await fetch( - `${this.url}/api/thought/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}`, - { - method: 'GET', - headers: { - 'Content-Type': 'application/json', + return retryDBOperation(async () => { + try { + const response = await fetch( + `${this.url}/api/thought/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}`, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }, + ); + + if (!response.ok) { + throw new Error("Failed to fetch thought"); + } + + const data = await response.json(); + return data.thought; + } catch (error) { + console.error("Error fetching thought:", error); + return null; + } + }); + } + + async addReaction( + conversationId: string, + messageId: string, + reaction: Exclude, + ): Promise<{ status: string }> { + return retryDBOperation(async () => { + try { + const response = await fetch( + `${this.url}/api/reaction/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}&reaction=${reaction}`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + }, }, + ); + + if (!response.ok) { + throw new Error("Failed to add reaction"); } - ); - if (!response.ok) { - throw new Error('Failed to fetch thought'); + return await response.json(); + } catch (error) { + console.error("Error adding reaction:", error); + throw error; } + }); + } - const data = await response.json(); - return data.thought; - } catch (error) { - console.error('Error fetching thought:', error); - return null; - } + async getReaction( + conversationId: string, + messageId: string, + ): Promise<{ reaction: Reaction }> { + return retryDBOperation(async () => { + try { + const response = await fetch( + `${this.url}/api/reaction/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}`, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }, + ); + + if (!response.ok) { + throw new Error("Failed to get reaction"); + } + + const data = await response.json(); + + // Validate the reaction + if ( + data.reaction !== null && + !["thumbs_up", "thumbs_down"].includes(data.reaction) + ) { + throw new Error("Invalid reaction received from server"); + } + + return data as { reaction: Reaction }; + } catch (error) { + console.error("Error getting reaction:", error); + throw error; + } + }); } async addOrRemoveReaction( diff --git a/www/utils/retryUtils.ts b/www/utils/retryUtils.ts new file mode 100644 index 0000000..492d369 --- /dev/null +++ b/www/utils/retryUtils.ts @@ -0,0 +1,72 @@ +import retry from "retry"; +import { captureException, captureMessage } from "@sentry/nextjs"; + +interface RetryOptions { + retries: number; + factor: number; + minTimeout: number; + maxTimeout: number; +} + +const dbOptions: RetryOptions = { + retries: 3, + factor: 1.5, + minTimeout: 1000, + maxTimeout: 10000, +}; + +const openAIOptions: RetryOptions = { + retries: 5, + factor: 2, + minTimeout: 4000, + maxTimeout: 60000, +}; + +function isRateLimitError(error: any): boolean { + return error?.response?.data?.error === "rate_limit_exceeded"; +} + +function retryOperation( + operation: () => Promise, + options: RetryOptions, + isOpenAI: boolean, +): Promise { + return new Promise((resolve, reject) => { + const retryOperation = retry.operation(options); + + retryOperation.attempt(async (currentAttempt) => { + try { + const result = await operation(); + resolve(result); + } catch (error: any) { + if (isOpenAI && isRateLimitError(error)) { + captureMessage("OpenAI Rate Limit Hit", { + level: "warning", + extra: { + attempt: currentAttempt, + error: error.message, + }, + }); + } else { + captureException(error); + } + + if (retryOperation.retry(error)) { + return; + } + + reject(retryOperation.mainError()); + } + }); + }); +} + +export function retryDBOperation(operation: () => Promise): Promise { + return retryOperation(operation, dbOptions, false); +} + +export function retryOpenAIOperation( + operation: () => Promise, +): Promise { + return retryOperation(operation, openAIOptions, true); +}