Skip to content

Commit

Permalink
Stream updates (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
roodboi authored Jan 5, 2024
1 parent 77f8d25 commit bf0d3b1
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 106 deletions.
Binary file modified bun.lockb
Binary file not shown.
149 changes: 116 additions & 33 deletions docs/concepts/streaming.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,65 @@
A common use case of structured extraction is defining a single schema class and then making another schema to create a list to do multiple extraction

By enabling streaming, you can do multiple extractions in a single request, and then iterate over the results as they come in.

!!! warning "Streaming changes the nature of the response"
!!! warning "Important: Changes in Response Behavior with Streaming Enabled"

Enabling streaming alters the nature of the response you receive:

**Response Type**: When streaming is enabled, the response becomes an Async Generator. This generator produces incremental updates until the final result is achieved.

When streaming is enabled, the response is no longer a single object, but an iterable of objects. This means that you can no longer use the response as a single object, but must iterate over it.
**Handling the Data**: As the Async Generator yields results, you can iterate over these incremental updates. It's important to note that the data from each yield is a complete snapshot of the current extraction state and is immediately usable.

This is a tradeoff for usability. If you want to use the response as a single object, you can disable streaming.
**Final Value**: The last value yielded by the generator represents the completed extraction. This value should be used as the final result.

**Example**: Extracting Conference Information
The following TypeScript example demonstrates how to use an Async Generator for streaming responses. It includes a schema definition for extraction and iterates over a stream of data to incrementally update and display the extracted information.

```ts
const user = await client.chat.completions.create({
messages: [{ role: "user", content: "Jason Liu is 30 years old" }],
model: "gpt-3.5-turbo",
response_model: UserSchema,
})
```

```ts
import Instructor from "@/instructor"
import OpenAI from "openai"
import { z } from "zod"

const UserSchema = z.object({
age: z.number(),
name: z.string()

const textBlock = `
In our recent online meeting, participants from various backgrounds joined to discuss the upcoming tech conference. The names and contact details of the participants were as follows:
- Name: John Doe, Email: [email protected], Twitter: @TechGuru44
- Name: Jane Smith, Email: [email protected], Twitter: @DigitalDiva88
- Name: Alex Johnson, Email: [email protected], Twitter: @CodeMaster2023
- Name: Emily Clark, Email: [email protected], Twitter: @InnovateQueen
- Name: Ron Stewart, Email: [email protected], Twitter: @RoboticsRon5
- Name: Sarah Lee, Email: [email protected], Twitter: @AI_Aficionado
- Name: Mike Brown, Email: [email protected], Twitter: @FutureTechLeader
- Name: Lisa Green, Email: [email protected], Twitter: @CyberSavvy101
- Name: David Wilson, Email: [email protected], Twitter: @GadgetGeek77
- Name: Daniel Kim, Email: [email protected], Twitter: @DataDrivenDude
During the meeting, we agreed on several key points. The conference will be held on March 15th, 2024, at the Grand Tech Arena located at 4521 Innovation Drive. Dr. Emily Johnson, a renowned AI researcher, will be our keynote speaker.
The budget for the event is set at $50,000, covering venue costs, speaker fees, and promotional activities. Each participant is expected to contribute an article to the conference blog by February 20th.
A follow-up meeting is scheduled for January 25th at 3 PM GMT to finalize the agenda and confirm the list of speakers.
`


const ExtractionValuesSchema = z.object({
users: z
.array(
z.object({
name: z.string(),
handle: z.string(),
twitter: z.string()
})
)
.min(5),
date: z.string(),
location: z.string(),
budget: z.number(),
deadline: z.string().min(1)
})

type User = Partial<z.infer<typeof UserSchema>>
type Extraction = Partial<z.infer<typeof ExtractionValuesSchema>>

const oai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY ?? undefined,
Expand All @@ -35,34 +68,84 @@ const oai = new OpenAI({

const client = Instructor({
client: oai,
mode: "FUNCTIONS"
mode: "TOOLS"
})
```

## Extracting Tasks using `stream=true`

By using Iterable you get a very convenient class with prompts and names automatically defined:

```ts
const userStream = await client.chat.completions.create({
messages: [{ role: "user", content: "Jason Liu is 30 years old" }],
model: "gpt-3.5-turbo",
response_model: UserSchema,
const extractionStream = await client.chat.completions.create({
messages: [{ role: "user", content: textBlock }],
model: "gpt-4-1106-preview",
response_model: ExtractionValuesSchema,
max_retries: 3,
stream: true
})

let user: User = {}
let extraction: Extraction = {}

for await (const result of userStream) {
for await (const result of extractionStream) {
try {
user = result
expect(result).toHaveProperty("_isValid")
expect(result).toHaveProperty("name")
expect(result).toHaveProperty("age")
extraction = result
console.clear()
console.table(extraction)
} catch (e) {
console.log(e)
break
}
}
```

console.clear()
console.log("completed extraction:")
console.table(extraction)

```


## Understanding OpenAI Completion Requests and Streaming Responses
**Server-Sent Events (SSE) and Async Generators**

OpenAI's completion requests return responses using Server-Sent Events (SSE), a protocol used to push real-time updates from a server to a client. In this context, the Async Generator in our TypeScript example closely mirrors the behavior of SSE. Each yield from the Async Generator corresponds to an update from the server, providing a continuous stream of data until the completion of the request.

**Transforming Async Generators to Readable Streams**

While the Async Generator is suitable for server-side processing of streaming data, there may be scenarios where you need to stream data to a client, such as a web browser. In such cases, you can transform the Async Generator into a ReadableStream, which is more suitable for client-side consumption.

Here's how you can transform an Async Generator to a ReadableStream:

```typescript
import { ReadableStream } from 'stream';

function asyncGeneratorToReadableStream(generator) {
const encoder = new TextEncoder();

return new ReadableStream({
async start(controller) {
for await (const parsedData of generator) {
controller.enqueue(encoder.encode(JSON.stringify(parsedData)));
}
controller.close();
},
cancel() {
if (cancelGenerator) {
cancelGenerator();
}
}
});
}

// Usage Example
const readableStream = asyncGeneratorToReadableStream(extractionStream);

// This ReadableStream can now be returned in an API endpoint or used in a similar context
```

***In this example:***

The asyncGeneratorToReadableStream function takes an Async Generator and an optional cancellation function.

It creates a new ReadableStream that, upon starting, iterates over the Async Generator using a for await...of loop.

Each piece of parsed data from the generator is encoded and enqueued into the stream.
Once the generator completes, the stream is closed using controller.close().

If the stream is canceled (e.g., client disconnects), an optional cancelGenerator function can be invoked to stop the generator.

This approach allows for seamless integration of OpenAI's streaming completion responses into web applications and other scenarios where streaming data directly to a client is required.
14 changes: 8 additions & 6 deletions examples/extract_user/anyscale.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import Instructor from "@/instructor"
import OpenAI from "openai"
import { z } from "zod"

const property = z.object({
name: z.string(),
value: z.string()
}).describe("A property defined by a name and value")
const property = z
.object({
name: z.string(),
value: z.string()
})
.describe("A property defined by a name and value")

const UserSchema = z.object({
age: z.number(),
Expand All @@ -15,7 +17,7 @@ const UserSchema = z.object({

const oai = new OpenAI({
baseURL: "https://api.endpoints.anyscale.com/v1",
apiKey: process.env.ANYSCALE_API_KEY ?? undefined,
apiKey: process.env.ANYSCALE_API_KEY ?? undefined
})

const client = Instructor({
Expand Down Expand Up @@ -45,4 +47,4 @@ console.log(user)
}
],
}
*/
*/
82 changes: 49 additions & 33 deletions examples/extract_user_stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,44 @@ import Instructor from "@/instructor"
import OpenAI from "openai"
import { z } from "zod"

const UserSchema = z.object({
age: z.number(),
name: z.string().refine(name => name.includes(" "), {
message: "Name must contain a space"
}),
thingsThatAreTheSameAgeAsTheUser: z
.array(z.string(), {
description: "a list of random things that are the same age as the user"
})
.min(6)
const textBlock = `
In our recent online meeting, participants from various backgrounds joined to discuss the upcoming tech conference. The names and contact details of the participants were as follows:
- Name: John Doe, Email: [email protected], Twitter: @TechGuru44
- Name: Jane Smith, Email: [email protected], Twitter: @DigitalDiva88
- Name: Alex Johnson, Email: [email protected], Twitter: @CodeMaster2023
- Name: Emily Clark, Email: [email protected], Twitter: @InnovateQueen
- Name: Ron Stewart, Email: [email protected], Twitter: @RoboticsRon5
- Name: Sarah Lee, Email: [email protected], Twitter: @AI_Aficionado
- Name: Mike Brown, Email: [email protected], Twitter: @FutureTechLeader
- Name: Lisa Green, Email: [email protected], Twitter: @CyberSavvy101
- Name: David Wilson, Email: [email protected], Twitter: @GadgetGeek77
- Name: Daniel Kim, Email: [email protected], Twitter: @DataDrivenDude
During the meeting, we agreed on several key points. The conference will be held on March 15th, 2024, at the Grand Tech Arena located at 4521 Innovation Drive. Dr. Emily Johnson, a renowned AI researcher, will be our keynote speaker.
The budget for the event is set at $50,000, covering venue costs, speaker fees, and promotional activities. Each participant is expected to contribute an article to the conference blog by February 20th.
A follow-up meeting is scheduled for January 25th at 3 PM GMT to finalize the agenda and confirm the list of speakers.
`

const ExtractionValuesSchema = z.object({
users: z
.array(
z.object({
name: z.string(),
handle: z.string(),
twitter: z.string()
})
)
.min(5),
date: z.string(),
location: z.string(),
budget: z.number(),
deadline: z.string().min(1)
})

type User = Partial<z.infer<typeof UserSchema>>
type Extraction = Partial<z.infer<typeof ExtractionValuesSchema>>

const oai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY ?? undefined,
Expand All @@ -26,36 +51,27 @@ const client = Instructor({
mode: "TOOLS"
})

const userStream = await client.chat.completions.create({
messages: [{ role: "user", content: "Jason Liu is 30 years old" }],
model: "gpt-3.5-turbo",
response_model: UserSchema,
const extractionStream = await client.chat.completions.create({
messages: [{ role: "user", content: textBlock }],
model: "gpt-4-1106-preview",
response_model: ExtractionValuesSchema,
max_retries: 3,
stream: true
})

const reader = userStream.readable.getReader()
const decoder = new TextDecoder()
let extraction: Extraction = {}

let result: User = {}
let done = false

while (!done) {
for await (const result of extractionStream) {
try {
const { value, done: doneReading } = await reader.read()
done = doneReading

if (done) {
process.stdout.write(`\r final: ${JSON.stringify(result)}\n`)
break
}

const chunkValue = decoder.decode(value)
result = JSON.parse(chunkValue)
process.stdout.write(`\r streaming: ${JSON.stringify(result)}`)
extraction = result
console.clear()
console.table(extraction)
} catch (e) {
done = true
console.log(e)
break
}
}

console.clear()
console.log("completed extraction:")
console.table(extraction)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
},
"homepage": "https://github.com/jxnl/instructor-js#readme",
"dependencies": {
"schema-stream": "1.2.1",
"schema-stream": "1.3.0",
"zod-to-json-schema": "^3.22.3",
"zod-validation-error": "^2.1.0"
},
Expand Down
29 changes: 24 additions & 5 deletions src/instructor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class Instructor {
this.client = client
this.mode = mode
this.debug = debug

//TODO: probably some more sophisticated validation we can do here re: modes and otherwise.
// but just throwing quick here for now.
if (mode === MODE.JSON_SCHEMA) {
if (!this.client.baseURL.includes("anyscale")) {
throw new Error("JSON_SCHEMA mode is only support on Anyscale.")
}
}
}

private log = (...args) => {
Expand Down Expand Up @@ -163,15 +171,21 @@ class Instructor {

private async partialStreamResponse({ stream, schema }) {
let _activeKey = null
let _completedKeys = []

const streamParser = new SchemaStream(schema, {
onKeyComplete: ({ activeKey }) => {
typeDefaults: {
string: null,
number: null,
boolean: null
},
onKeyComplete: ({ activeKey, completedKeys }) => {
_activeKey = activeKey
_completedKeys = completedKeys
}
})

const parser = streamParser.parse({
stringStreaming: true
})
const parser = streamParser.parse({})

const textEncoder = new TextEncoder()
const textDecoder = new TextDecoder()
Expand All @@ -184,7 +198,12 @@ class Instructor {

controller.enqueue(
textEncoder.encode(
JSON.stringify({ ...parsedChunk, _isValid: validation.success, _activeKey })
JSON.stringify({
...parsedChunk,
_isValid: validation.success,
_activeKey,
_completedKeys
})
)
)
} catch (e) {
Expand Down
9 changes: 9 additions & 0 deletions src/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export function omit<T extends object, K extends keyof T>(keys: K[], obj: T): Omit<T, K> {
const result = {} as Omit<T, K>
for (const key in obj) {
if (obj.hasOwnProperty(key) && !keys.includes(key as unknown as K)) {
result[key as unknown as Exclude<keyof T, K>] = obj[key] as unknown as T[Exclude<keyof T, K>]
}
}
return result
}
Loading

0 comments on commit bf0d3b1

Please sign in to comment.