Skip to content

Commit

Permalink
fixup! sse streaming client
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Pilar <[email protected]>
  • Loading branch information
Tomas Pilar committed Dec 15, 2023
1 parent ac84a23 commit c14067f
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 126 deletions.
121 changes: 121 additions & 0 deletions src/api/event-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import {
EventStreamContentType,
fetchEventSource,
} from '@ai-zen/node-fetch-event-source';

import { TypedReadable } from '../utils/stream.js';
import { BaseError, HttpError, InternalError } from '../errors.js';
import { safeParseJson } from '../helpers/common.js';
import { RawHeaders } from '../client.js';

export interface ApiEventClient {
stream: <T>(opts: {
url: string;
headers?: RawHeaders;
body?: any;
signal?: AbortSignal;
}) => TypedReadable<T>;
}

export function createApiEventClient(clientOptions: {
baseUrl?: string;
headers?: RawHeaders;
}): ApiEventClient {
return {
stream: function fetchSSE<T>({
url,
headers,
body,
signal,
}: Parameters<ApiEventClient['stream']>[0]) {
const outputStream = new TypedReadable<T>({
autoDestroy: true,
objectMode: true,
signal: signal,
});

const onClose = () => {
if (outputStream.readable) {
outputStream.push(null);
}
};

const delegatedController = new AbortController();
if (signal) {
signal.addEventListener(
'abort',
() => {
delegatedController.abort();
},
{
once: true,
},
);
}

const onError = (e: unknown) => {
const err =
e instanceof BaseError
? e
: new InternalError('Unexpected error', { cause: e });

delegatedController.abort();
if (outputStream.readable) {
outputStream.emit('error', err);
throw err;
}
onClose();
};
fetchEventSource(new URL(url, clientOptions.baseUrl).toString(), {
method: 'POST',
body: JSON.stringify(body),
headers: {
...clientOptions.headers,
...headers,
'Content-Type': 'application/json',
},
signal: delegatedController.signal,
onclose: onClose,
async onopen(response) {
const contentType = response.headers.get('content-type') || '';

if (response.ok && contentType === EventStreamContentType) {
return;
}

const responseData = contentType.startsWith('application/json')
? await response.json().catch(() => null)
: null;

onError(new HttpError(responseData));
},
onmessage(message) {
if (message.event === 'close') {
onClose();
return;
}
if (message.data === '') {
return;
}

const result = safeParseJson(message.data);
if (result === null) {
onError(
new InternalError(
`Failed to parse message "${JSON.stringify(message)}"`,
),
);
return;
}

outputStream.push(result);
},
onerror: onError,
}).catch(() => {
/* Prevent uncaught exception (errors are handled inside the stream) */
});

return outputStream;
},
};
}
31 changes: 15 additions & 16 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {
ApiClientResponse,
createApiClient,
} from './api/client.js';
import { fetchSSE } from './utils/stream.js';
import { clientErrorWrapper } from './utils/errors.js';
import { OmitVersion } from './utils/types.js';
import { ApiEventClient, createApiEventClient } from './api/event-client.js';

export type RawHeaders = Record<string, string>;

Expand All @@ -28,8 +28,7 @@ export type Options = { signal?: AbortSignal };

export class Client {
readonly #client: ApiClient;
readonly #endpoint: string;
readonly #headers: RawHeaders;
readonly #eventClient: ApiEventClient;

constructor(config: Configuration = {}) {
const endpoint = config.endpoint ?? lookupEndpoint();
Expand All @@ -44,19 +43,22 @@ export class Client {

const agent = version ? `node-sdk/${version}` : 'node-sdk';

this.#endpoint = endpoint;
this.#headers = {
const headers = {
'User-Agent': agent,
'X-Request-Origin': agent,
...config.headers,
Accept: 'application/json',
Authorization: `Bearer ${apiKey}`,
};
this.#client = createApiClient({
baseUrl: this.#endpoint,
headers: this.#headers,
baseUrl: endpoint,
headers,
fetch: fetchRetry(fetch) as any, // https://github.com/jonbern/fetch-retry/issues/89
});
this.#eventClient = createApiEventClient({
baseUrl: endpoint,
headers,
});
}

async models(
Expand Down Expand Up @@ -137,15 +139,12 @@ export class Client {
},
});

fetchSSE<EventMessage>({
url: new URL(
`/v2/text/generation_stream?version=2023-11-22`,
this.#endpoint,
),
headers: this.#headers,
body: input,
signal: opts?.signal,
})
this.#eventClient
.stream<EventMessage>({
url: '/v2/text/generation_stream?version=2023-11-22',
body: input,
signal: opts?.signal,
})
.on('error', (err) => stream.emit('error', err))
.pipe(stream);

Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ export * from './client.js';
export * from './errors.js';

export * from './buildInfo.js';
export * from './constants.js';
109 changes: 0 additions & 109 deletions src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
import { Readable } from 'stream';

import {
EventStreamContentType,
fetchEventSource,
} from '@ai-zen/node-fetch-event-source';

import { BaseError, HttpError, InternalError } from '../errors.js';
import { safeParseJson } from '../helpers/common.js';
import { RawHeaders } from '../client.js';

export class TypedReadable<T> extends Readable {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_read(size: number) {
Expand Down Expand Up @@ -44,103 +35,3 @@ export class TypedReadable<T> extends Readable {
return super[Symbol.asyncIterator]();
}
}

export function fetchSSE<Output>({
url,
headers,
body,
signal,
}: {
url: URL;
headers: RawHeaders;
body: any;
signal?: AbortSignal;
}) {
const outputStream = new TypedReadable<Output>({
autoDestroy: true,
objectMode: true,
signal: signal,
});

const onClose = () => {
if (outputStream.readable) {
outputStream.push(null);
}
};

const delegatedController = new AbortController();
if (signal) {
signal.addEventListener(
'abort',
() => {
delegatedController.abort();
},
{
once: true,
},
);
}

const onError = (e: unknown) => {
const err =
e instanceof BaseError
? e
: new InternalError('Unexpected error', { cause: e });

delegatedController.abort();
if (outputStream.readable) {
outputStream.emit('error', err);
throw err;
}
onClose();
};
fetchEventSource(url.toString(), {
method: 'POST',
body: JSON.stringify(body),
headers: {
...headers,
'Content-Type': 'application/json',
},
signal: delegatedController.signal,
onclose: onClose,
async onopen(response) {
const contentType = response.headers.get('content-type') || '';

if (response.ok && contentType === EventStreamContentType) {
return;
}

const responseData = contentType.startsWith('application/json')
? await response.json().catch(() => null)
: null;

onError(new HttpError(responseData));
},
onmessage(message) {
if (message.event === 'close') {
onClose();
return;
}
if (message.data === '') {
return;
}

const result = safeParseJson(message.data);
if (result === null) {
onError(
new InternalError(
`Failed to parse message "${JSON.stringify(message)}"`,
),
);
return;
}

outputStream.push(result);
},
onerror: onError,
}).catch(() => {
/* Prevent uncaught exception (errors are handled inside the stream) */
});

return outputStream;
}

0 comments on commit c14067f

Please sign in to comment.