Skip to content

Commit

Permalink
[protocolv2] swap ReadStream's waitForClose promise with callback sty…
Browse files Browse the repository at this point in the history
…le onClose (#160)
  • Loading branch information
masad-frost committed May 31, 2024
1 parent f20170d commit 6a8a15a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 40 deletions.
31 changes: 10 additions & 21 deletions __tests__/streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,16 @@ describe('ReadStream unit', () => {
stream.triggerClose();
});

it('should resolve waitForClose until after close', async () => {
it('should throw when requesting to close after closing', () => {
const stream = new ReadStreamImpl<number>(noopCb);
stream.triggerClose();
expect(() => stream.requestClose()).toThrowError(Error);
});

const waitP = stream.waitForClose();
it('should call onClose callback until after close', async () => {
const stream = new ReadStreamImpl<number>(noopCb);

const waitP = new Promise<void>((resolve) => stream.onClose(resolve));

expect(
await Promise.race([
Expand All @@ -199,29 +205,12 @@ describe('ReadStream unit', () => {
waitP,
]),
).toEqual(undefined);
expect(
await Promise.race([
new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)),
stream.waitForClose(),
]),
).toEqual(undefined);
});

it('should resolve waitForClose when called after closing', async () => {
it('should error when onClose called after closing', async () => {
const stream = new ReadStreamImpl<number>(noopCb);
stream.triggerClose();
expect(
await Promise.race([
new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)),
stream.waitForClose(),
]),
).toEqual(undefined);
expect(
await Promise.race([
new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)),
stream.waitForClose(),
]),
).toEqual(undefined);
expect(() => stream.onClose(noopCb)).toThrowError(Error);
});

it('should throw when pushing to a closed stream', async () => {
Expand Down
50 changes: 31 additions & 19 deletions router/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ export interface ReadStream<T> {
*/
isLocked(): boolean;
/**
* `waitForClose` returns a promise that resolves when the stream is closed,
* does not send a close request.
* `onClose` registers a callback that will be called when the stream
* is closed. Returns a function that can be used to unregister the
* listener.
*/
waitForClose(): Promise<undefined>;
onClose(cb: () => void): () => void;
/**
* `isClosed` returns true if the stream was closed by the writer.
*
Expand Down Expand Up @@ -136,13 +137,9 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
*/
private closed = false;
/**
* Used to for `waitForClose` and `requestClose`.
* A list of listeners that will be called when the stream is closed.
*/
private closePromise: Promise<undefined>;
/**
* Resolves closePromise
*/
private resolveClosePromise: () => void = () => undefined;
private onCloseListeners: Set<() => void>;
/**
* Whether the user has requested to close the stream.
*/
Expand Down Expand Up @@ -182,13 +179,11 @@ export class ReadStreamImpl<T> implements ReadStream<T> {

constructor(closeRequestCallback: () => void) {
this.closeRequestCallback = closeRequestCallback;
this.closePromise = new Promise((resolve) => {
this.resolveClosePromise = () => resolve(undefined);
});
this.onCloseListeners = new Set();
}

public [Symbol.asyncIterator]() {
if (this.locked) {
if (this.isLocked()) {
throw new TypeError('ReadStream is already locked');
}

Expand All @@ -202,7 +197,7 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
throw new InterruptedStreamError();
}

if (this.closed) {
if (this.isClosed()) {
return {
done: true,
value: undefined,
Expand Down Expand Up @@ -275,17 +270,33 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
return this.locked;
}

public waitForClose(): Promise<undefined> {
return this.closePromise;
public onClose(cb: () => void): () => void {
if (this.isClosed()) {
throw new Error('Stream is already closed');
}

this.onCloseListeners.add(cb);

return () => {
this.onCloseListeners.delete(cb);
};
}

public requestClose(): Promise<undefined> {
if (this.isClosed()) {
throw new Error('Cannot request close after stream already closed');
}

if (!this.closeRequested) {
this.closeRequested = true;
this.closeRequestCallback();
}

return this.closePromise;
return new Promise<undefined>((resolve) => {
this.onClose(() => {
resolve(undefined);
});
});
}

public isCloseRequested(): boolean {
Expand Down Expand Up @@ -318,13 +329,14 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
* values before calling this method.
*/
public triggerClose(): undefined {
if (this.closed) {
if (this.isClosed()) {
throw new Error('Unexpected closing multiple times');
}

this.closed = true;
this.resolveNextPromise?.();
this.resolveClosePromise();
this.onCloseListeners.forEach((cb) => cb());
this.onCloseListeners.clear();
}

/**
Expand Down

0 comments on commit 6a8a15a

Please sign in to comment.