Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces Streaming #80

Merged
merged 12 commits into from
Jun 13, 2024
Merged

Introduces Streaming #80

merged 12 commits into from
Jun 13, 2024

Conversation

liamgriffiths
Copy link
Contributor

@liamgriffiths liamgriffiths commented May 21, 2024

Implements streaming in the SDK!

The streaming interface largely mirrors the non-streaming one. Instead of substrate.run we have substrate.stream which returns a SubstrateStreamingResponse (vs a SubstrateResponse).

This new response object is also an async generator and can also be iterated over to read the stream messages as they are received.

  // simple example of what this looks like
  const stream = await substrate.stream(a);

  for await (let message of stream) {
      console.log(message);
  }

In addition to using the iterator, there are two additional utilities I added in for now, tee and get.

  • tee is useful because once a stream is consumed, it cannot be read again - however if the user would like to have multiple consumer/iterators they can use tee to split the stream into however many they like.
  • get is the streaming analog to res.get in the non-streaming interface. It filters the stream for messages that belong to a specific node and re-uses some the type inference we have so far.
  // using `tee`
  const stream = await substrate.stream(a);
  const streams = stream.tee(); // defaults to 2, but can split into any number

  const log = async (stream) => {
    for await (let message of stream) {
      console.log(message);
    }
  }

  await Promise.all(streams.map(s => log(s)))
  // using `get`
  const stream = await substrate.stream(a, b, c);

  for await (let message of stream.get(a)) {
      // only messages for node `a`
      console.log(message);
  }

The server will respond with server-sent events when the request is:

  • using the special x-substrate-streaming: 1 header
  • the client uses the accept: text/event-stream header
  • the nodes being requested support streaming

The server-sent event messages come back in a few flavors:

  • node.delta - chunks of a nodes result (when possible)
  • node.result - the final result for a node
  • node.error - any errors encountered trying to run the node
  • graph.result - the accumulated result of the entire graph

We might also include other message types, but tbd:

  • node.usage - various usage metrics for the node
  • graph.error - any graph-level errors we may encounter

@liamgriffiths liamgriffiths changed the title Experimental example of streaming nodes Introduces Streaming Jun 7, 2024

if (response.ok) {
setOutput("");
const stream = await sb.streaming.fromSSEResponse(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat

@liamgriffiths liamgriffiths marked this pull request as ready for review June 10, 2024 15:27
@liamgriffiths liamgriffiths merged commit f6dc5e9 into main Jun 13, 2024
2 checks passed
@liamgriffiths liamgriffiths deleted the liam/streaming branch June 13, 2024 14:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants