Skip to content

Commit

Permalink
fix: optimize sse chunk reading off-by-one error (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-app[bot] committed Feb 18, 2025
1 parent f3d988c commit 7f1a4f3
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 127 deletions.
31 changes: 31 additions & 0 deletions src/internal/decoders/line.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,34 @@ function findNewlineIndex(

return null;
}

export function findDoubleNewlineIndex(buffer: Uint8Array): number {
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
// and returns the index right after the first occurrence of any pattern,
// or -1 if none of the patterns are found.
const newline = 0x0a; // \n
const carriage = 0x0d; // \r

for (let i = 0; i < buffer.length - 1; i++) {
if (buffer[i] === newline && buffer[i + 1] === newline) {
// \n\n
return i + 2;
}
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
// \r\r
return i + 2;
}
if (
buffer[i] === carriage &&
buffer[i + 1] === newline &&
i + 3 < buffer.length &&
buffer[i + 2] === carriage &&
buffer[i + 3] === newline
) {
// \r\n\r\n
return i + 4;
}
}

return -1;
}
48 changes: 1 addition & 47 deletions src/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ReadableStream, type Response } from './_shims/index';
import { TogetherError } from './error';
import { LineDecoder } from './internal/decoders/line';
import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line';
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';

import { APIError } from './error';
Expand Down Expand Up @@ -245,37 +245,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
}
}

function findDoubleNewlineIndex(buffer: Uint8Array): number {
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
// and returns the index right after the first occurrence of any pattern,
// or -1 if none of the patterns are found.
const newline = 0x0a; // \n
const carriage = 0x0d; // \r

for (let i = 0; i < buffer.length - 2; i++) {
if (buffer[i] === newline && buffer[i + 1] === newline) {
// \n\n
return i + 2;
}
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
// \r\r
return i + 2;
}
if (
buffer[i] === carriage &&
buffer[i + 1] === newline &&
i + 3 < buffer.length &&
buffer[i + 2] === carriage &&
buffer[i + 3] === newline
) {
// \r\n\r\n
return i + 4;
}
}

return -1;
}

class SSEDecoder {
private data: string[];
private event: string | null;
Expand Down Expand Up @@ -331,21 +300,6 @@ class SSEDecoder {
}
}

/** This is an internal helper function that's just used for testing */
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
const decoder = new LineDecoder();
const lines: string[] = [];
for (const chunk of chunks) {
lines.push(...decoder.decode(chunk));
}

if (flush) {
lines.push(...decoder.flush());
}

return lines;
}

function partition(str: string, delimiter: string): [string, string, string] {
const index = str.indexOf(delimiter);
if (index !== -1) {
Expand Down
128 changes: 128 additions & 0 deletions tests/internal/decoders/line.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { findDoubleNewlineIndex, LineDecoder } from 'together-ai/internal/decoders/line';

function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
const decoder = new LineDecoder();
const lines: string[] = [];
for (const chunk of chunks) {
lines.push(...decoder.decode(chunk));
}

if (flush) {
lines.push(...decoder.flush());
}

return lines;
}

describe('line decoder', () => {
test('basic', () => {
// baz is not included because the line hasn't ended yet
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
});

test('basic with \\r', () => {
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
});

test('trailing new lines', () => {
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
});

test('trailing new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
});

test('escaped new lines', () => {
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
});

test('escaped new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
});

test('\\r & \\n split across multiple chunks', () => {
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('single \\r', () => {
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('double \\r', () => {
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
});

test('double \\r then \\r\\n', () => {
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
});

test('double newline', () => {
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('multi-byte characters across chunks', () => {
const decoder = new LineDecoder();

// bytes taken from the string 'известни' and arbitrarily split
// so that some multi-byte characters span multiple chunks
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
expect(
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
).toHaveLength(0);

const decoded = decoder.decode(new Uint8Array([0xa]));
expect(decoded).toEqual(['известни']);
});

test('flushing trailing newlines', () => {
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('flushing empty buffer', () => {
expect(decodeChunks([], { flush: true })).toEqual([]);
});
});

describe('findDoubleNewlineIndex', () => {
test('finds \\n\\n', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2);
});

test('finds \\r\\r', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2);
});

test('finds \\r\\n\\r\\n', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7);
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4);
});

test('returns -1 when no double newline found', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1);
});

test('handles incomplete patterns', () => {
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1);
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1);
});
});
81 changes: 1 addition & 80 deletions tests/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,7 @@
import { Response } from 'node-fetch';
import { PassThrough } from 'stream';
import assert from 'assert';
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'together-ai/streaming';
import { LineDecoder } from 'together-ai/internal/decoders/line';

describe('line decoder', () => {
test('basic', () => {
// baz is not included because the line hasn't ended yet
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
});

test('basic with \\r', () => {
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
});

test('trailing new lines', () => {
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
});

test('trailing new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
});

test('escaped new lines', () => {
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
});

test('escaped new lines with \\r', () => {
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
});

test('\\r & \\n split across multiple chunks', () => {
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('single \\r', () => {
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
});

test('double \\r', () => {
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
});

test('double \\r then \\r\\n', () => {
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
});

test('double newline', () => {
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('multi-byte characters across chunks', () => {
const decoder = new LineDecoder();

// bytes taken from the string 'известни' and arbitrarily split
// so that some multi-byte characters span multiple chunks
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
expect(
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
).toHaveLength(0);

const decoded = decoder.decode(new Uint8Array([0xa]));
expect(decoded).toEqual(['известни']);
});

test('flushing trailing newlines', () => {
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
});

test('flushing empty buffer', () => {
expect(decodeChunks([], { flush: true })).toEqual([]);
});
});
import { _iterSSEMessages } from 'together-ai/streaming';

describe('streaming decoding', () => {
test('basic', async () => {
Expand Down

0 comments on commit 7f1a4f3

Please sign in to comment.