Skip to content

Commit

Permalink
Support iterable inputs for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Wundero committed Dec 12, 2024
1 parent 08756ec commit 41f8206
Showing 1 changed file with 88 additions and 26 deletions.
114 changes: 88 additions & 26 deletions packages/core/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,89 @@ import type { UserInfo } from "./types";

type SendDataParam = z.infer<typeof ServerEndpointSchema>;

function prepareStream(shape: object, stream: ReadableStream<unknown>) {
const reader = stream.getReader();
const morphedUnencodedStream = new ReadableStream<unknown>({
function unknownAsyncIterableToReadableStream<T>(
input: AsyncIterable<T>,
): ReadableStream<T> {
return new ReadableStream({
async start(controller) {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.close();
return;
}
controller.enqueue({
...shape,
message: value,
});
for await (const message of input) {
controller.enqueue(message);
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
return morphedUnencodedStream;
}

function unknownIterableToReadableStream<T>(
input: Iterable<T>,
): ReadableStream<T> {
return new ReadableStream({
start(controller) {
try {
for (const message of input) {
controller.enqueue(message);
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
}

type ReadableInput<T> = Iterable<T> | AsyncIterable<T>;

function isIterable<T>(input: unknown): input is Iterable<T> {
if (typeof input !== "object" || input === null) {
return false;
}
return (
Symbol.iterator in input &&
typeof (input as Iterable<T>)[Symbol.iterator] === "function"
);
}

function isAsyncIterable<T>(input: unknown): input is AsyncIterable<T> {
if (typeof input !== "object" || input === null) {
return false;
}
return (
Symbol.asyncIterator in input &&
typeof (input as AsyncIterable<T>)[Symbol.asyncIterator] === "function"
);
}

function toReadableStream<T>(input: ReadableInput<T>): ReadableStream<T> {
if (input instanceof ReadableStream) {
return input;
}
if ("from" in ReadableStream) {
// @ts-expect-error Types for ReadableStream are incomplete
return ReadableStream.from(input) as ReadableStream<T>;
}
if (isAsyncIterable(input)) {
return unknownAsyncIterableToReadableStream(input);
}
if (isIterable(input)) {
return unknownIterableToReadableStream(input);
}
return input;
}

function prepareStream(shape: object, stream: ReadableStream<unknown>) {
const transformer = new TransformStream<unknown, object>({
transform(chunk, controller) {
controller.enqueue({
...shape,
message: chunk,
});
},
});
return stream.pipeThrough(transformer);
}

class Sourcerer {
Expand Down Expand Up @@ -75,13 +136,14 @@ class Sourcerer {
): Promise<number>;
private async sendData<TData extends SendDataParam>(
data: TData,
stream: ReadableStream<unknown>,
iterable: ReadableInput<unknown>,
): Promise<number[]>;
private async sendData<TData extends SendDataParam>(
data: TData,
stream?: ReadableStream<unknown>,
iterable?: ReadableInput<unknown>,
) {
if (stream) {
if (iterable) {
const stream = toReadableStream(iterable);
const encodedStream = prepareStream(data, stream);
const ws = await this.connectWS();
const reader = encodedStream.getReader();
Expand Down Expand Up @@ -225,7 +287,7 @@ class Sourcerer {
* Stream messages to a channel.
* @param channel The channel to send the messages to.
* @param event The event to send.
* @param stream A stream of data to send. Each chunk should be one message.
* @param data A stream of data to send. Each chunk should be one message. Note that order of delivery is not guaranteed.
* @returns The HTTP status code Sinkr returned. The function may return before the stream is finished.
*/
async streamToChannel<
Expand All @@ -234,15 +296,15 @@ class Sourcerer {
>(
channel: string,
event: TEvent,
stream: ReadableStream<TData>,
data: ReadableInput<TData>,
): Promise<number[]> {
return await this.sendData(
{
route: "channel",
channel,
event: `${event}`,
},
stream,
data,
);
}

Expand All @@ -269,7 +331,7 @@ class Sourcerer {
* Stream messages directly to a user.
* @param userId The ID of the user to send the message to. This can be a peer ID or, for authenticated users, the user ID.
* @param event The event to send.
* @param stream The stream of data to send. Each chunk should be one message.
* @param data The stream of data to send. Each chunk should be one message. Note that order of delivery is not guaranteed.
* @returns The HTTP status code Sinkr returned. The function may return before the stream is finished.
*/
async streamDirectMessage<
Expand All @@ -278,15 +340,15 @@ class Sourcerer {
>(
userId: string,
event: TEvent,
stream: ReadableStream<TData>,
data: ReadableInput<TData>,
): Promise<number[]> {
return await this.sendData(
{
route: "direct",
recipientId: userId,
event: `${event}`,
},
stream,
data,
);
}

Expand All @@ -310,19 +372,19 @@ class Sourcerer {
/**
* Broadcast a stream of messages to all connected clients.
* @param event The event to send.
* @param stream The stream of data to send. Each chunk should be one message.
* @param data The stream of data to send. Each chunk should be one message. Note that order of delivery is not guaranteed.
* @returns The HTTP status code Sinkr returned. The function may return before the stream is finished.
*/
async streamBroadcastMessage<
TEvent extends keyof RealEventMap,
TData extends RealEventMap[TEvent],
>(event: TEvent, stream: ReadableStream<TData>): Promise<number[]> {
>(event: TEvent, data: ReadableInput<TData>): Promise<number[]> {
return await this.sendData(
{
route: "broadcast",
event: `${event}`,
},
stream,
data,
);
}
}
Expand Down

0 comments on commit 41f8206

Please sign in to comment.