From 07f76c8fe67fa31b2d02bd4943d4b6196c294db7 Mon Sep 17 00:00:00 2001 From: Simon Farshid Date: Sat, 3 Aug 2024 21:20:10 -0700 Subject: [PATCH] refactor(runtime/external-store): rewrite sync mechanism (#630) --- .../external-store/ExternalStoreRuntime.tsx | 4 - .../ExternalStoreThreadRuntime.tsx | 186 ++++++++++-------- .../runtimes/external-store/auto-status.tsx | 5 + .../useExternalStoreRuntime.tsx | 7 +- .../external-store/useExternalStoreSync.tsx | 66 ------- 5 files changed, 114 insertions(+), 154 deletions(-) delete mode 100644 packages/react/src/runtimes/external-store/useExternalStoreSync.tsx diff --git a/packages/react/src/runtimes/external-store/ExternalStoreRuntime.tsx b/packages/react/src/runtimes/external-store/ExternalStoreRuntime.tsx index ca33163cf..931758dfa 100644 --- a/packages/react/src/runtimes/external-store/ExternalStoreRuntime.tsx +++ b/packages/react/src/runtimes/external-store/ExternalStoreRuntime.tsx @@ -14,10 +14,6 @@ export class ExternalStoreRuntime extends BaseAssistantRuntime) { + this._store = store; + } + + public set store(store: ExternalStoreAdapter) { + const oldStore = this._store; + this._store = store; + + // flush the converter cache when the convertMessage prop changes + if (oldStore.convertMessage !== store.convertMessage) { + this.converter = new ThreadMessageConverter(); + } else if ( + oldStore.isDisabled === store.isDisabled && + oldStore.isRunning === store.isRunning && + oldStore.messages === store.messages + ) { + // no update needed + return; + } + + const isRunning = store.isRunning ?? false; + const isDisabled = store.isDisabled ?? false; + + const convertCallback: ConverterCallback = (cache, m, idx) => { + if (!store.convertMessage) return m; + + const isLast = idx === store.messages.length - 1; + const autoStatus = getAutoStatus(isLast, isRunning); - constructor(public store: ExternalStoreAdapter) { - this.updateData( - store.isDisabled ?? false, - store.isRunning ?? false, + if ( + cache && + (cache.role !== "assistant" || + !isAutoStatus(cache.status) || + cache.status === autoStatus) + ) + return cache; + + const newMessage = fromThreadMessageLike( + store.convertMessage(m, idx), + idx.toString(), + autoStatus, + ); + (newMessage as any)[symbolInnerMessage] = m; + return newMessage; + }; + + const messages = this.converter.convertMessages( store.messages, + convertCallback, ); - this.useStore = create(() => ({ - store, - })); + for (let i = 0; i < messages.length; i++) { + const message = messages[i]!; + const parent = messages[i - 1]; + this.repository.addOrUpdateMessage(parent?.id ?? null, message); + } + + if (this.assistantOptimisticId) { + this.repository.deleteMessage(this.assistantOptimisticId); + this.assistantOptimisticId = null; + } + + if (hasUpcomingMessage(isRunning, messages)) { + this.assistantOptimisticId = this.repository.appendOptimisticMessage( + messages.at(-1)?.id ?? null, + { + role: "assistant", + content: [], + }, + ); + } + + this.repository.resetHead( + this.assistantOptimisticId ?? messages.at(-1)?.id ?? null, + ); + + this.messages = this.repository.getMessages(); + this.isDisabled = isDisabled; + this.isRunning = isRunning; + + for (const callback of this._subscriptions) callback(); } public getBranches(messageId: string): string[] { @@ -52,7 +131,7 @@ export class ExternalStoreThreadRuntime implements ReactThreadRuntime { } public switchToBranch(branchId: string): void { - if (!this.store.setMessages) + if (!this._store.setMessages) throw new Error("Runtime does not support switching branches."); this.repository.switchToBranch(branchId); @@ -61,26 +140,26 @@ export class ExternalStoreThreadRuntime implements ReactThreadRuntime { public async append(message: AppendMessage): Promise { if (message.parentId !== (this.messages.at(-1)?.id ?? null)) { - if (!this.store.onEdit) + if (!this._store.onEdit) throw new Error("Runtime does not support editing messages."); - await this.store.onEdit(message); + await this._store.onEdit(message); } else { - await this.store.onNew(message); + await this._store.onNew(message); } } public async startRun(parentId: string | null): Promise { - if (!this.store.onReload) + if (!this._store.onReload) throw new Error("Runtime does not support reloading messages."); - await this.store.onReload(parentId); + await this._store.onReload(parentId); } public cancelRun(): void { - if (!this.store.onCancel) + if (!this._store.onCancel) throw new Error("Runtime does not support cancelling runs."); - this.store.onCancel(); + this._store.onCancel(); if (this.assistantOptimisticId) { this.repository.deleteMessage(this.assistantOptimisticId); @@ -101,65 +180,14 @@ export class ExternalStoreThreadRuntime implements ReactThreadRuntime { } private updateMessages = (messages: ThreadMessage[]) => { - this.store.setMessages?.( + this._store.setMessages?.( messages.flatMap(getExternalStoreMessage).filter((m) => m != null), ); }; - public onStoreUpdated() { - if (this.useStore.getState().store !== this.store) { - this.useStore.setState({ store: this.store }); - } - } - - private updateData = ( - isDisabled: boolean, - isRunning: boolean, - vm: ThreadMessage[], - ) => { - for (let i = 0; i < vm.length; i++) { - const message = vm[i]!; - const parent = vm[i - 1]; - this.repository.addOrUpdateMessage(parent?.id ?? null, message); - } - - if (this.assistantOptimisticId) { - this.repository.deleteMessage(this.assistantOptimisticId); - this.assistantOptimisticId = null; - } - - if (hasUpcomingMessage(isRunning, vm)) { - this.assistantOptimisticId = this.repository.appendOptimisticMessage( - vm.at(-1)?.id ?? null, - { - role: "assistant", - content: [], - }, - ); - } - - this.repository.resetHead( - this.assistantOptimisticId ?? vm.at(-1)?.id ?? null, - ); - - this.messages = this.repository.getMessages(); - this.isDisabled = isDisabled; - this.isRunning = isRunning; - - for (const callback of this._subscriptions) callback(); - }; - - unstable_synchronizer = () => { - const { store } = this.useStore(); - - useExternalStoreSync(store, this.updateData); - - return null; - }; - addToolResult(options: AddToolResultOptions) { - if (!this.store.onAddToolResult) + if (!this._store.onAddToolResult) throw new Error("Runtime does not support tool results."); - this.store.onAddToolResult(options); + this._store.onAddToolResult(options); } } diff --git a/packages/react/src/runtimes/external-store/auto-status.tsx b/packages/react/src/runtimes/external-store/auto-status.tsx index cdc247d29..58023e498 100644 --- a/packages/react/src/runtimes/external-store/auto-status.tsx +++ b/packages/react/src/runtimes/external-store/auto-status.tsx @@ -1,8 +1,13 @@ +import { MessageStatus } from "../../types"; + const AUTO_STATUS_RUNNING = Object.freeze({ type: "running" }); const AUTO_STATUS_COMPLETE = Object.freeze({ type: "complete", reason: "unknown", }); +export const isAutoStatus = (status: MessageStatus) => + status === AUTO_STATUS_RUNNING || status === AUTO_STATUS_COMPLETE; + export const getAutoStatus = (isLast: boolean, isRunning: boolean) => isLast && isRunning ? AUTO_STATUS_RUNNING : AUTO_STATUS_COMPLETE; diff --git a/packages/react/src/runtimes/external-store/useExternalStoreRuntime.tsx b/packages/react/src/runtimes/external-store/useExternalStoreRuntime.tsx index 72a7cbe5e..73390ea9d 100644 --- a/packages/react/src/runtimes/external-store/useExternalStoreRuntime.tsx +++ b/packages/react/src/runtimes/external-store/useExternalStoreRuntime.tsx @@ -1,16 +1,13 @@ -import { useEffect, useInsertionEffect, useState } from "react"; +import { useInsertionEffect, useState } from "react"; import { ExternalStoreRuntime } from "./ExternalStoreRuntime"; import { ExternalStoreAdapter } from "./ExternalStoreAdapter"; -export const useExternalStoreRuntime = (store: ExternalStoreAdapter) => { +export const useExternalStoreRuntime = (store: ExternalStoreAdapter) => { const [runtime] = useState(() => new ExternalStoreRuntime(store)); useInsertionEffect(() => { runtime.store = store; }); - useEffect(() => { - runtime.onStoreUpdated(); - }); return runtime; }; diff --git a/packages/react/src/runtimes/external-store/useExternalStoreSync.tsx b/packages/react/src/runtimes/external-store/useExternalStoreSync.tsx deleted file mode 100644 index f571f0b1f..000000000 --- a/packages/react/src/runtimes/external-store/useExternalStoreSync.tsx +++ /dev/null @@ -1,66 +0,0 @@ -import { useEffect, useInsertionEffect, useMemo, useRef } from "react"; -import { ExternalStoreAdapter } from "./ExternalStoreAdapter"; -import { - ConverterCallback, - ThreadMessageConverter, -} from "./ThreadMessageConverter"; -import { ThreadMessage } from "../../types"; -import { symbolInnerMessage } from "./getExternalStoreMessage"; -import { getAutoStatus } from "./auto-status"; -import { fromThreadMessageLike } from "./ThreadMessageLike"; - -type UpdateDataCallback = ( - isDisabled: boolean, - isRunning: boolean, - messages: ThreadMessage[], -) => void; - -export const useExternalStoreSync = ( - adapter: ExternalStoreAdapter, - updateData: UpdateDataCallback, -) => { - const adapterRef = useRef(adapter); - useInsertionEffect(() => { - adapterRef.current = adapter; - }); - - // flush the converter cache when the convertMessage prop changes - const [converter, convertCallback] = useMemo(() => { - const converter = adapter.convertMessage ?? ((m: T) => m as ThreadMessage); - const convertCallback: ConverterCallback = (cache, m, idx) => { - const autoStatus = getAutoStatus( - adapterRef.current.messages.at(-1) === m, - adapterRef.current.isRunning ?? false, - ); - - if (cache && (cache.role !== "assistant" || cache.status === autoStatus)) - return cache; - - const newMessage = fromThreadMessageLike( - converter(m, idx), - idx.toString(), - autoStatus, - ); - (newMessage as any)[symbolInnerMessage] = m; - return newMessage; - }; - return [new ThreadMessageConverter(), convertCallback]; - - // specify convertMessage bceause we want the ThreadMessageConverter to be recreated when the adapter changes - }, [adapter.convertMessage]); - - useEffect(() => { - updateData( - adapter.isDisabled ?? false, - adapter.isRunning ?? false, - converter.convertMessages(adapter.messages, convertCallback), - ); - }, [ - updateData, - converter, - convertCallback, - adapter.isDisabled, - adapter.isRunning, - adapter.messages, - ]); -};