Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

fix: remove try/catch on controller enqueue #49

Merged
merged 7 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export type TsonAsyncStringifier = <TValue>(
) => TsonAsyncStringifierIterable<TValue>;
export type TsonAsyncIndex = TsonBranded<number, "AsyncRegistered">;

type ReaderValue<T> = T | TsonStreamInterruptedError;
export interface TsonTransformerSerializeDeserializeAsync<
TValue,
TSerializedValue,
Expand All @@ -28,13 +27,15 @@ export interface TsonTransformerSerializeDeserializeAsync<
*/
deserialize: (opts: {
/**
* The controller for the ReadableStream, to close when we're done
* Close the controller for the ReadableStream of values
*/
controller: ReadableStreamDefaultController<ReaderValue<TSerializedValue>>;
close: () => void;
/**
* Reader for the ReadableStream of values
*/
reader: ReadableStreamDefaultReader<ReaderValue<TSerializedValue>>;
reader: ReadableStreamDefaultReader<
TSerializedValue | TsonStreamInterruptedError
>;
}) => TValue;

/**
Expand Down
25 changes: 13 additions & 12 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,24 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const idx = serializedValue as TsonAsyncIndex;

let controller: ReadableStreamDefaultController<unknown> =
null as unknown as ReadableStreamDefaultController<unknown>;
const readable = new ReadableStream<unknown>({
start(c) {
cache.set(idx, c);
controller = c;
},
});
// the `start` method is called "immediately when the object is constructed"
// [MDN](http://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream)
// so we're guaranteed that the controller is set in the cache
assert(controller, "Controller not set - this is a bug");

cache.set(idx, controller);

return transformer.deserialize({
get controller() {
// the `start` method is called "immediately when the object is constructed"
// [MDN](http://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream)
// so we're guaranteed that the controller is set in the cache
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return cache.get(idx)!;
close() {
controller.close();
cache.delete(idx);
},
reader: readable.getReader(),
});
Expand Down Expand Up @@ -197,11 +202,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

// enqueue the error to all the streams
for (const controller of cache.values()) {
try {
controller.enqueue(err);
} catch {
// ignore if the controller is closed
}
controller.enqueue(err);
}

opts.onStreamError?.(err);
Expand Down
4 changes: 2 additions & 2 deletions src/async/handlers/tsonAsyncIterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@

switch (value[0]) {
case ITERATOR_DONE: {
opts.controller.close();
return;
}

case ITERATOR_ERROR: {
opts.controller.close();
throw TsonPromiseRejectionError.from(value[1]);
}

Expand All @@ -51,6 +49,8 @@
}
}
}

opts.close();

Check warning on line 53 in src/async/handlers/tsonAsyncIterable.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncIterable.ts#L52-L53

Added lines #L52 - L53 were not covered by tests
})();
},
key: "AsyncIterable",
Expand Down
2 changes: 1 addition & 1 deletion src/async/handlers/tsonPromise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const tsonPromise: TsonAsyncType<MyPromise, SerializedPromiseValue> = {
const promise = new Promise((resolve, reject) => {
async function _handle() {
const next = await opts.reader.read();
opts.controller.close();
opts.close();

if (next.done) {
throw new TsonPromiseRejectionError(
Expand Down
Loading