Skip to content

Commit

Permalink
UI chore: load messages asynchronously via SSE
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Hopper-Lowe <[email protected]>
  • Loading branch information
ryanhopperlowe committed Nov 5, 2024
1 parent 4b25a62 commit ea0e6a0
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 256 deletions.
4 changes: 1 addition & 3 deletions ui/admin/app/components/chat/Chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ type ChatProps = React.HTMLAttributes<HTMLDivElement> & {
};

export function Chat({ className, showStartButton = false }: ChatProps) {
const { messages, threadId, generatingMessage, mode, invoke, readOnly } =
useChat();
const { messages, threadId, mode, invoke, readOnly } = useChat();
const [runTriggered, setRunTriggered] = useState(false);

const showMessagePane =
Expand All @@ -28,7 +27,6 @@ export function Chat({ className, showStartButton = false }: ChatProps) {
<MessagePane
classNames={{ root: "h-full", messageList: "px-20" }}
messages={messages}
generatingMessage={generatingMessage}
/>
</div>
)}
Expand Down
268 changes: 105 additions & 163 deletions ui/admin/app/components/chat/ChatContext.tsx
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
import {
ReactNode,
createContext,
startTransition,
useCallback,
useContext,
useEffect,
useMemo,
useRef,
useState,
} from "react";
import useSWR, { mutate } from "swr";
import { mutate } from "swr";

import { ChatEvent, combineChatEvents } from "~/lib/model/chatEvents";
import {
Message,
chatEventsToMessages,
promptMessage,
toolCallMessage,
} from "~/lib/model/messages";
import { ChatEvent } from "~/lib/model/chatEvents";
import { Message, promptMessage, toolCallMessage } from "~/lib/model/messages";
import { InvokeService } from "~/lib/service/api/invokeService";
import { ThreadsService } from "~/lib/service/api/threadsService";
import { readStream } from "~/lib/stream";

import { useAsync } from "~/hooks/useAsync";

Expand All @@ -31,11 +24,10 @@ interface ChatContextType {
processUserMessage: (text: string, sender: "user" | "agent") => void;
id: string;
threadId: string | undefined;
generatingMessage: Message | null;
invoke: (prompt?: string) => void;
readOnly?: boolean;
isRunning: boolean;
isLoading: boolean;
isInvoking: boolean;
}

const ChatContext = createContext<ChatContextType | undefined>(undefined);
Expand All @@ -55,64 +47,6 @@ export function ChatProvider({
onCreateThreadId?: (threadId: string) => void;
readOnly?: boolean;
}) {
const [insertedMessages, setInsertedMessages] = useState<Message[]>([]);
const [generatingMessage, setGeneratingMessage] = useState<string | null>(
null
);
const [isRunning, _setIsRunning] = useState(false);
const isRunningRef = useRef(false);
const isRunningToolCall = useRef(false);

const setIsRunning = (value: boolean) => {
isRunningRef.current = value;
_setIsRunning(value);
};

// todo(tylerslaton): this is a huge hack to get the generating message and runId to be
// interactable during workflow invokes. take a look at invokeWorkflow to see why this is
// currently needed.
const generatingRunIdRef = useRef<string | null>(null);
const generatingMessageRef = useRef<string | null>(null);

const appendToGeneratingMessage = (content: string) => {
generatingMessageRef.current =
(generatingMessageRef.current || "") + content;

setGeneratingMessage(generatingMessageRef.current);
};

const clearGeneratingMessage = () => {
generatingMessageRef.current = null;
setGeneratingMessage(null);
};

const getThreadEvents = useSWR(
ThreadsService.getThreadEvents.key(threadId),
({ threadId }) => ThreadsService.getThreadEvents(threadId),
{
onSuccess: () => setInsertedMessages([]),
revalidateOnFocus: false,
revalidateOnReconnect: false,
}
);

const messages = useMemo(
() =>
chatEventsToMessages(combineChatEvents(getThreadEvents.data || [])),
[getThreadEvents.data]
);

// clear out inserted messages when the threadId changes
useEffect(() => {
if (isRunningRef.current) return;
setInsertedMessages([]);
}, [threadId]);

/** inserts message optimistically */
const insertMessage = (message: Message) => {
setInsertedMessages((prev) => [...prev, message]);
};

/**
* processUserMessage is responsible for adding the user's message to the chat and
* triggering the agent to respond to it.
Expand All @@ -121,7 +55,7 @@ export function ChatProvider({
if (mode === "workflow" || readOnly) return;
const newMessage: Message = { text, sender };

insertMessage(newMessage);
// insertMessage(newMessage);
handlePrompt(newMessage.text);
};

Expand All @@ -142,118 +76,31 @@ export function ChatProvider({
// do nothing if the mode is workflow
};

const insertGeneratingMessage = (runId?: string) => {
// skip if there is no message or it is only whitespace
if (generatingMessageRef.current) {
insertMessage({
sender: "agent",
runId,
text: generatingMessageRef.current,
});
clearGeneratingMessage();
}
};

const invokeAgent = useAsync(InvokeService.invokeAgentWithStream, {
onSuccess: ({ reader, threadId: responseThreadId }) => {
clearGeneratingMessage();

setIsRunning(true);

onSuccess: ({ threadId: responseThreadId }) => {
if (responseThreadId && !threadId) {
// persist the threadId
onCreateThreadId?.(responseThreadId);

// revalidate threads
mutate(ThreadsService.getThreads.key());
}

readStream<ChatEvent>({
reader,
onChunk: (chunk) =>
// use a transition for performance
startTransition(() => {
const { content, toolCall, runID, input, prompt } =
chunk;

generatingRunIdRef.current = runID;

if (toolCall) {
isRunningToolCall.current = true;
// cut off generating message
insertGeneratingMessage(runID);

// insert tool call message
insertMessage(toolCallMessage(toolCall));

clearGeneratingMessage();

return;
}
isRunningToolCall.current = false;

if (prompt) {
insertGeneratingMessage(runID);
insertMessage(promptMessage(prompt, runID));
return;
}

if (content && !input) {
appendToGeneratingMessage(content);
}
}),
onComplete: async (chunks) => {
insertGeneratingMessage(chunks[0]?.runID);
clearGeneratingMessage();

invokeAgent.clear();
generatingRunIdRef.current = null;
setIsRunning(false);
},
});
},
});

const outGeneratingMessage = useMemo<Message | null>(() => {
if (invokeAgent.isLoading)
return { sender: "agent", text: "", isLoading: true };

if (!generatingMessage) {
if (invokeAgent.data?.reader && !isRunningToolCall.current) {
return {
sender: "agent",
text: "",
isLoading: true,
};
}

return null;
}

return {
sender: "agent",
text: generatingMessage,
runId: generatingRunIdRef.current ?? undefined,
};
}, [generatingMessage, invokeAgent.isLoading, invokeAgent.data]);

// combine messages and inserted messages
const outMessages = useMemo(() => {
return [...(messages ?? []), ...insertedMessages];
}, [messages, insertedMessages]);
const { messages, isRunning } = useMessageSource(threadId);

return (
<ChatContext.Provider
value={{
messages: outMessages,
messages,
processUserMessage,
mode,
id,
threadId,
generatingMessage: outGeneratingMessage,
invoke,
isRunning,
isLoading: getThreadEvents.isLoading,
isInvoking: invokeAgent.isLoading,
readOnly,
}}
>
Expand All @@ -269,3 +116,98 @@ export function useChat() {
}
return context;
}

function useMessageSource(threadId?: string) {
const [messageMap, setMessageMap] = useState<Map<string, Message>>(
new Map()
);
const [isRunning, setIsRunning] = useState(false);

const addContent = useCallback((event: ChatEvent) => {
console.log(event);

const { content, prompt, toolCall, runComplete, input, error, runID } =
event;

setIsRunning(!runComplete);

setMessageMap((prev) => {
const copy = new Map(prev);

const contentID = event.contentID ?? crypto.randomUUID();

const existing = copy.get(contentID);
if (existing) {
copy.set(contentID, {
...existing,
text: existing.text + content,
});

return copy;
}

if (error) {
copy.set(contentID, {
sender: "agent",
text: error,
runId: runID,
error: true,
});
return copy;
}

if (input) {
copy.set(contentID, {
sender: "user",
text: input,
runId: runID,
});
return copy;
}

if (toolCall) {
copy.set(contentID, toolCallMessage(toolCall));
return copy;
}

if (prompt) {
copy.set(contentID, promptMessage(prompt, runID));
return copy;
}

if (content) {
copy.set(contentID, {
sender: "agent",
text: content,
runId: runID,
});
return copy;
}

return copy;
});
}, []);

useEffect(() => {
setMessageMap(new Map());

if (!threadId) return;

const source = ThreadsService.getThreadEventSource(threadId);

source.onmessage = (event) => {
const chunk = JSON.parse(event.data) as ChatEvent;
addContent(chunk);
};

return () => {
source.close();
};
}, [threadId, addContent]);

const messages = useMemo(() => {
return Array.from(messageMap.values());
}, [messageMap]);

return { messages, messageMap, isRunning };
}
7 changes: 4 additions & 3 deletions ui/admin/app/components/chat/Chatbar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { useState } from "react";
import { cn } from "~/lib/utils";

import { useChat } from "~/components/chat/ChatContext";
import { LoadingSpinner } from "~/components/ui/LoadingSpinner";
import { Button } from "~/components/ui/button";
import { AutosizeTextarea } from "~/components/ui/textarea";

Expand All @@ -13,7 +14,7 @@ type ChatbarProps = {

export function Chatbar({ className }: ChatbarProps) {
const [input, setInput] = useState("");
const { processUserMessage, isRunning } = useChat();
const { processUserMessage, isRunning, isInvoking } = useChat();

const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
Expand Down Expand Up @@ -53,9 +54,9 @@ export function Chatbar({ className }: ChatbarProps) {
variant="secondary"
className="rounded-full"
type="submit"
disabled={!input || isRunning}
disabled={!input || isRunning || isInvoking}
>
<CircleArrowUpIcon />
{isInvoking ? <LoadingSpinner /> : <CircleArrowUpIcon />}
</Button>
</form>
);
Expand Down
Loading

0 comments on commit ea0e6a0

Please sign in to comment.