diff --git a/src/async/asyncTypes.ts b/src/async/asyncTypes.ts index 63d6b618..4f28b5f2 100644 --- a/src/async/asyncTypes.ts +++ b/src/async/asyncTypes.ts @@ -17,7 +17,6 @@ export type TsonAsyncStringifier = ( ) => TsonAsyncStringifierIterable; export type TsonAsyncIndex = TsonBranded; -type ReaderValue = T | TsonStreamInterruptedError; export interface TsonTransformerSerializeDeserializeAsync< TValue, TSerializedValue, @@ -28,13 +27,15 @@ export interface TsonTransformerSerializeDeserializeAsync< */ deserialize: (opts: { /** - * The controller for the ReadableStream, to close when we're done + * Close the controller for the ReadableStream of values */ - controller: ReadableStreamDefaultController>; + close: () => void; /** * Reader for the ReadableStream of values */ - reader: ReadableStreamDefaultReader>; + reader: ReadableStreamDefaultReader< + TSerializedValue | TsonStreamInterruptedError + >; }) => TValue; /** diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index ec14bfa2..0f298b4e 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -66,19 +66,24 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const idx = serializedValue as TsonAsyncIndex; + let controller: ReadableStreamDefaultController = + null as unknown as ReadableStreamDefaultController; const readable = new ReadableStream({ start(c) { - cache.set(idx, c); + controller = c; }, }); + // the `start` method is called "immediately when the object is constructed" + // [MDN](http://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream) + // so we're guaranteed that the controller is set in the cache + assert(controller, "Controller not set - this is a bug"); + + cache.set(idx, controller); return transformer.deserialize({ - get controller() { - // the `start` method is called "immediately when the object is constructed" - // [MDN](http://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream) - // so we're guaranteed that the controller is set in the cache - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return cache.get(idx)!; + close() { + controller.close(); + cache.delete(idx); }, reader: readable.getReader(), }); @@ -197,11 +202,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { // enqueue the error to all the streams for (const controller of cache.values()) { - try { - controller.enqueue(err); - } catch { - // ignore if the controller is closed - } + controller.enqueue(err); } opts.onStreamError?.(err); diff --git a/src/async/handlers/tsonAsyncIterable.ts b/src/async/handlers/tsonAsyncIterable.ts index ce58840e..46a74d2e 100644 --- a/src/async/handlers/tsonAsyncIterable.ts +++ b/src/async/handlers/tsonAsyncIterable.ts @@ -36,12 +36,10 @@ export const tsonAsyncIterator: TsonAsyncType< switch (value[0]) { case ITERATOR_DONE: { - opts.controller.close(); return; } case ITERATOR_ERROR: { - opts.controller.close(); throw TsonPromiseRejectionError.from(value[1]); } @@ -51,6 +49,8 @@ export const tsonAsyncIterator: TsonAsyncType< } } } + + opts.close(); })(); }, key: "AsyncIterable", diff --git a/src/async/handlers/tsonPromise.ts b/src/async/handlers/tsonPromise.ts index ad650d0b..e6f60e6a 100644 --- a/src/async/handlers/tsonPromise.ts +++ b/src/async/handlers/tsonPromise.ts @@ -28,7 +28,7 @@ export const tsonPromise: TsonAsyncType = { const promise = new Promise((resolve, reject) => { async function _handle() { const next = await opts.reader.read(); - opts.controller.close(); + opts.close(); if (next.done) { throw new TsonPromiseRejectionError(