diff --git a/src/internal/decoders/line.ts b/src/internal/decoders/line.ts index 271dc43..107d8ef 100644 --- a/src/internal/decoders/line.ts +++ b/src/internal/decoders/line.ts @@ -143,3 +143,34 @@ function findNewlineIndex( return null; } + +export function findDoubleNewlineIndex(buffer: Uint8Array): number { + // This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n) + // and returns the index right after the first occurrence of any pattern, + // or -1 if none of the patterns are found. + const newline = 0x0a; // \n + const carriage = 0x0d; // \r + + for (let i = 0; i < buffer.length - 1; i++) { + if (buffer[i] === newline && buffer[i + 1] === newline) { + // \n\n + return i + 2; + } + if (buffer[i] === carriage && buffer[i + 1] === carriage) { + // \r\r + return i + 2; + } + if ( + buffer[i] === carriage && + buffer[i + 1] === newline && + i + 3 < buffer.length && + buffer[i + 2] === carriage && + buffer[i + 3] === newline + ) { + // \r\n\r\n + return i + 4; + } + } + + return -1; +} diff --git a/src/streaming.ts b/src/streaming.ts index ef0c3d5..3c955af 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -1,6 +1,6 @@ import { ReadableStream, type Response } from './_shims/index'; import { TogetherError } from './error'; -import { LineDecoder } from './internal/decoders/line'; +import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line'; import { ReadableStreamToAsyncIterable } from './internal/stream-utils'; import { APIError } from './error'; @@ -245,37 +245,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator): AsyncGene } } -function findDoubleNewlineIndex(buffer: Uint8Array): number { - // This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n) - // and returns the index right after the first occurrence of any pattern, - // or -1 if none of the patterns are found. - const newline = 0x0a; // \n - const carriage = 0x0d; // \r - - for (let i = 0; i < buffer.length - 2; i++) { - if (buffer[i] === newline && buffer[i + 1] === newline) { - // \n\n - return i + 2; - } - if (buffer[i] === carriage && buffer[i + 1] === carriage) { - // \r\r - return i + 2; - } - if ( - buffer[i] === carriage && - buffer[i + 1] === newline && - i + 3 < buffer.length && - buffer[i + 2] === carriage && - buffer[i + 3] === newline - ) { - // \r\n\r\n - return i + 4; - } - } - - return -1; -} - class SSEDecoder { private data: string[]; private event: string | null; @@ -331,21 +300,6 @@ class SSEDecoder { } } -/** This is an internal helper function that's just used for testing */ -export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] { - const decoder = new LineDecoder(); - const lines: string[] = []; - for (const chunk of chunks) { - lines.push(...decoder.decode(chunk)); - } - - if (flush) { - lines.push(...decoder.flush()); - } - - return lines; -} - function partition(str: string, delimiter: string): [string, string, string] { const index = str.indexOf(delimiter); if (index !== -1) { diff --git a/tests/internal/decoders/line.test.ts b/tests/internal/decoders/line.test.ts new file mode 100644 index 0000000..f20d988 --- /dev/null +++ b/tests/internal/decoders/line.test.ts @@ -0,0 +1,128 @@ +import { findDoubleNewlineIndex, LineDecoder } from 'together-ai/internal/decoders/line'; + +function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] { + const decoder = new LineDecoder(); + const lines: string[] = []; + for (const chunk of chunks) { + lines.push(...decoder.decode(chunk)); + } + + if (flush) { + lines.push(...decoder.flush()); + } + + return lines; +} + +describe('line decoder', () => { + test('basic', () => { + // baz is not included because the line hasn't ended yet + expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']); + }); + + test('basic with \\r', () => { + expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']); + expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']); + }); + + test('trailing new lines', () => { + expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']); + }); + + test('trailing new lines with \\r', () => { + expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']); + }); + + test('escaped new lines', () => { + expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']); + }); + + test('escaped new lines with \\r', () => { + expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']); + }); + + test('\\r & \\n split across multiple chunks', () => { + expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']); + }); + + test('single \\r', () => { + expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']); + }); + + test('double \\r', () => { + expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']); + expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); + // implementation detail that we don't yield the single \r line until a new \r or \n is encountered + expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']); + }); + + test('double \\r then \\r\\n', () => { + expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']); + expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']); + }); + + test('double newline', () => { + expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); + expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); + expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); + expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); + }); + + test('multi-byte characters across chunks', () => { + const decoder = new LineDecoder(); + + // bytes taken from the string 'известни' and arbitrarily split + // so that some multi-byte characters span multiple chunks + expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0); + expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0); + expect( + decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])), + ).toHaveLength(0); + + const decoded = decoder.decode(new Uint8Array([0xa])); + expect(decoded).toEqual(['известни']); + }); + + test('flushing trailing newlines', () => { + expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); + }); + + test('flushing empty buffer', () => { + expect(decodeChunks([], { flush: true })).toEqual([]); + }); +}); + +describe('findDoubleNewlineIndex', () => { + test('finds \\n\\n', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2); + }); + + test('finds \\r\\r', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2); + }); + + test('finds \\r\\n\\r\\n', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4); + }); + + test('returns -1 when no double newline found', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1); + }); + + test('handles incomplete patterns', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1); + }); +}); diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts index 9fefee0..bfa8720 100644 --- a/tests/streaming.test.ts +++ b/tests/streaming.test.ts @@ -1,86 +1,7 @@ import { Response } from 'node-fetch'; import { PassThrough } from 'stream'; import assert from 'assert'; -import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'together-ai/streaming'; -import { LineDecoder } from 'together-ai/internal/decoders/line'; - -describe('line decoder', () => { - test('basic', () => { - // baz is not included because the line hasn't ended yet - expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']); - }); - - test('basic with \\r', () => { - expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']); - expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']); - }); - - test('trailing new lines', () => { - expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']); - }); - - test('trailing new lines with \\r', () => { - expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']); - }); - - test('escaped new lines', () => { - expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']); - }); - - test('escaped new lines with \\r', () => { - expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']); - }); - - test('\\r & \\n split across multiple chunks', () => { - expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']); - }); - - test('single \\r', () => { - expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']); - }); - - test('double \\r', () => { - expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']); - expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); - // implementation detail that we don't yield the single \r line until a new \r or \n is encountered - expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']); - }); - - test('double \\r then \\r\\n', () => { - expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']); - expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']); - }); - - test('double newline', () => { - expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); - expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); - expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); - expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); - }); - - test('multi-byte characters across chunks', () => { - const decoder = new LineDecoder(); - - // bytes taken from the string 'известни' and arbitrarily split - // so that some multi-byte characters span multiple chunks - expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0); - expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0); - expect( - decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])), - ).toHaveLength(0); - - const decoded = decoder.decode(new Uint8Array([0xa])); - expect(decoded).toEqual(['известни']); - }); - - test('flushing trailing newlines', () => { - expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); - }); - - test('flushing empty buffer', () => { - expect(decodeChunks([], { flush: true })).toEqual([]); - }); -}); +import { _iterSSEMessages } from 'together-ai/streaming'; describe('streaming decoding', () => { test('basic', async () => {