From 6a8a15aa5eb2c057684f98b56e17eb96804407e0 Mon Sep 17 00:00:00 2001 From: Faris Masad Date: Fri, 31 May 2024 13:00:57 -0700 Subject: [PATCH] [protocolv2] swap ReadStream's waitForClose promise with callback style onClose (#160) --- __tests__/streams.test.ts | 31 ++++++++---------------- router/streams.ts | 50 ++++++++++++++++++++++++--------------- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/__tests__/streams.test.ts b/__tests__/streams.test.ts index e86b49f6..466a5335 100644 --- a/__tests__/streams.test.ts +++ b/__tests__/streams.test.ts @@ -180,10 +180,16 @@ describe('ReadStream unit', () => { stream.triggerClose(); }); - it('should resolve waitForClose until after close', async () => { + it('should throw when requesting to close after closing', () => { const stream = new ReadStreamImpl(noopCb); + stream.triggerClose(); + expect(() => stream.requestClose()).toThrowError(Error); + }); - const waitP = stream.waitForClose(); + it('should call onClose callback until after close', async () => { + const stream = new ReadStreamImpl(noopCb); + + const waitP = new Promise((resolve) => stream.onClose(resolve)); expect( await Promise.race([ @@ -199,29 +205,12 @@ describe('ReadStream unit', () => { waitP, ]), ).toEqual(undefined); - expect( - await Promise.race([ - new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)), - stream.waitForClose(), - ]), - ).toEqual(undefined); }); - it('should resolve waitForClose when called after closing', async () => { + it('should error when onClose called after closing', async () => { const stream = new ReadStreamImpl(noopCb); stream.triggerClose(); - expect( - await Promise.race([ - new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)), - stream.waitForClose(), - ]), - ).toEqual(undefined); - expect( - await Promise.race([ - new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)), - stream.waitForClose(), - ]), - ).toEqual(undefined); + expect(() => stream.onClose(noopCb)).toThrowError(Error); }); it('should throw when pushing to a closed stream', async () => { diff --git a/router/streams.ts b/router/streams.ts index 601ecf94..76bcd905 100644 --- a/router/streams.ts +++ b/router/streams.ts @@ -43,10 +43,11 @@ export interface ReadStream { */ isLocked(): boolean; /** - * `waitForClose` returns a promise that resolves when the stream is closed, - * does not send a close request. + * `onClose` registers a callback that will be called when the stream + * is closed. Returns a function that can be used to unregister the + * listener. */ - waitForClose(): Promise; + onClose(cb: () => void): () => void; /** * `isClosed` returns true if the stream was closed by the writer. * @@ -136,13 +137,9 @@ export class ReadStreamImpl implements ReadStream { */ private closed = false; /** - * Used to for `waitForClose` and `requestClose`. + * A list of listeners that will be called when the stream is closed. */ - private closePromise: Promise; - /** - * Resolves closePromise - */ - private resolveClosePromise: () => void = () => undefined; + private onCloseListeners: Set<() => void>; /** * Whether the user has requested to close the stream. */ @@ -182,13 +179,11 @@ export class ReadStreamImpl implements ReadStream { constructor(closeRequestCallback: () => void) { this.closeRequestCallback = closeRequestCallback; - this.closePromise = new Promise((resolve) => { - this.resolveClosePromise = () => resolve(undefined); - }); + this.onCloseListeners = new Set(); } public [Symbol.asyncIterator]() { - if (this.locked) { + if (this.isLocked()) { throw new TypeError('ReadStream is already locked'); } @@ -202,7 +197,7 @@ export class ReadStreamImpl implements ReadStream { throw new InterruptedStreamError(); } - if (this.closed) { + if (this.isClosed()) { return { done: true, value: undefined, @@ -275,17 +270,33 @@ export class ReadStreamImpl implements ReadStream { return this.locked; } - public waitForClose(): Promise { - return this.closePromise; + public onClose(cb: () => void): () => void { + if (this.isClosed()) { + throw new Error('Stream is already closed'); + } + + this.onCloseListeners.add(cb); + + return () => { + this.onCloseListeners.delete(cb); + }; } public requestClose(): Promise { + if (this.isClosed()) { + throw new Error('Cannot request close after stream already closed'); + } + if (!this.closeRequested) { this.closeRequested = true; this.closeRequestCallback(); } - return this.closePromise; + return new Promise((resolve) => { + this.onClose(() => { + resolve(undefined); + }); + }); } public isCloseRequested(): boolean { @@ -318,13 +329,14 @@ export class ReadStreamImpl implements ReadStream { * values before calling this method. */ public triggerClose(): undefined { - if (this.closed) { + if (this.isClosed()) { throw new Error('Unexpected closing multiple times'); } this.closed = true; this.resolveNextPromise?.(); - this.resolveClosePromise(); + this.onCloseListeners.forEach((cb) => cb()); + this.onCloseListeners.clear(); } /**