Skip to content

Commit

Permalink
Some UX improvements (be able to produce n-tee iters, node message ty…
Browse files Browse the repository at this point in the history
…pes)
  • Loading branch information
liamgriffiths committed Jun 5, 2024
1 parent d63e401 commit 37bf9be
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 82 deletions.
51 changes: 0 additions & 51 deletions examples/streaming.js

This file was deleted.

37 changes: 37 additions & 0 deletions examples/streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env -S npx ts-node --transpileOnly

import { Substrate, Llama3Instruct70B } from "substrate";

async function main() {
const SUBSTRATE_API_KEY = process.env["SUBSTRATE_API_KEY"];

const substrate = new Substrate({
apiKey: SUBSTRATE_API_KEY,
baseUrl: "https://api-staging.substrate.run",
});

const a = new Llama3Instruct70B({
prompt: "what are server side events useful for?",
max_tokens: 50,
});
const b = new Llama3Instruct70B({
prompt: "what is an async generator?",
max_tokens: 50,
});

const stream = await substrate.stream(a, b);

const [s1, s2] = stream.tee(2);

for await (let message of s1!.get(a)) {
if (message.object === "node.delta") {
console.log(message);
}
}
for await (let message of s2!.get(b)) {
if (message.object === "node.delta") {
console.log(message);
}
}
}
main();
40 changes: 32 additions & 8 deletions src/Streaming.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
export type NodeResult<T = Object> = {
/** Represents an array item within a `Node` output chunk, specifies the field is an array containing this `item` at the `index`. **/
type ChunkArrayItem<T = Object> = {
object: "array.item";
index: number;
item: T;
};

/** Helper types for producing the "Chunk" types used in the `NodeDelta` messages */
type ChunkizeObject<T> = T extends object
? { [P in keyof T]: ChunkizeAny<T[P]> }
: T;

type ChunkizeArray<T> = T extends (infer U)[]
? ChunkArrayItem<ChunkizeAny<U>>
: ChunkArrayItem<T>;

type ChunkizeAny<T> = T extends (infer U)[]
? ChunkizeArray<U>
: T extends object
? ChunkizeObject<T>
: T;

/** Stream message that contains the completed `Node` output */
type NodeResult<T = Object> = {
object: "node.result";
nodeId: string;
data: T;
};

export type NodeDelta<T = Object> = {
/** Stream message that contains a chunk of the `Node` output */
type NodeDelta<T = Object> = {
object: "node.delta";
nodeId: string;
data: T;
data: ChunkizeAny<T>;
};

/** Stream message when an error happened during a `Node` run. */
export type NodeError = {
object: "node.error";
nodeId: string;
Expand All @@ -19,13 +44,12 @@ export type NodeError = {
};
};

/** Stream message that contains the completed "Graph" output */
export type GraphResult<T = Object> = {
object: "graph.result";
data: T;
};

export type SSEMessage<T = Object> =
| NodeResult<T>
| NodeDelta<T>
| NodeError
| GraphResult<T>;
export type NodeMessage<T = Object> = NodeResult<T> | NodeDelta<T> | NodeError;

export type SSEMessage<T = Object> = NodeMessage | GraphResult<T>;
2 changes: 1 addition & 1 deletion src/Substrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class Substrate {
/**
* Stream the given nodes.
*/
async stream(...nodes: Node[]): Promise<any> {
async stream(...nodes: Node[]): Promise<SubstrateStreamingResponse> {
const serialized = Substrate.serialize(...nodes);
return this.streamSerialized(serialized);
}
Expand Down
43 changes: 21 additions & 22 deletions src/SubstrateStreamingResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createParser } from "substrate/EventSource";
import { SSEMessage } from "substrate/Streaming";
import { Node } from "substrate/Node";
import { NodeMessage, SSEMessage } from "substrate/Streaming";
import { SubstrateError } from "substrate/Error";
import { AnyNode, NodeOutput } from "substrate/Nodes";

export class SubstrateStreamingResponse {
public apiRequest: Request;
Expand All @@ -18,42 +18,41 @@ export class SubstrateStreamingResponse {
return this.iterator;
}

tee() {
const left: any[] = [];
const right: any[] = [];
tee(n: number = 2) {
const queues: any[] = [];
for (let i = 0; i < n; i++) {
queues.push([]);
}

const iterator = this.iterator;

const teeIterator = (queue: any) => {
const teeIterator = (queue: SSEMessage[]) => {
return {
next: () => {
if (queue.length === 0) {
const result = iterator.next();
left.push(result);
right.push(result);
for (let q of queues) q.push(result);
}
return queue.shift();
},
};
};

return [
new SubstrateStreamingResponse(
this.apiRequest,
this.apiResponse,
teeIterator(left),
),
new SubstrateStreamingResponse(
return queues.map((q) => {
return new SubstrateStreamingResponse(
this.apiRequest,
this.apiResponse,
teeIterator(right),
),
];
teeIterator(q),
);
});
}

async *filter(node: Node) {
for await (const message of this) {
if (message?.nodeId === node.id) {
yield message;
async *get<T extends AnyNode>(
node: T,
): AsyncGenerator<NodeMessage<NodeOutput<T>>> {
for await (let message of this) {
if (message?.node_id === node.id) {
yield message as NodeMessage<NodeOutput<T>>;
}
}
}
Expand Down

0 comments on commit 37bf9be

Please sign in to comment.