diff --git a/__tests__/streams.test.ts b/__tests__/streams.test.ts index e16bd332..e86b49f6 100644 --- a/__tests__/streams.test.ts +++ b/__tests__/streams.test.ts @@ -1,5 +1,9 @@ import { describe, it, expect, vi } from 'vitest'; -import { InterruptedStreamError, ReadStreamImpl } from '../router/streams'; +import { + InterruptedStreamError, + ReadStreamImpl, + WriteStreamImpl, +} from '../router/streams'; const noopCb = () => undefined; @@ -334,3 +338,50 @@ describe('ReadStream unit', () => { }); }); }); + +describe('WriteStream unit', () => { + it('should write', async () => { + const writeCb = vi.fn(); + const stream = new WriteStreamImpl(writeCb, noopCb); + stream.write(1); + stream.write(2); + + expect(writeCb).toHaveBeenNthCalledWith(1, 1); + expect(writeCb).toHaveBeenNthCalledWith(2, 2); + }); + + it('should close the stream', async () => { + const closeCb = vi.fn(); + const stream = new WriteStreamImpl(noopCb, closeCb); + + expect(stream.isClosed()).toBeFalsy(); + + stream.close(); + expect(closeCb).toHaveBeenCalled(); + expect(stream.isClosed()).toBeTruthy(); + }); + + it('should throw when writing after close', async () => { + const stream = new WriteStreamImpl(noopCb, noopCb); + stream.close(); + expect(() => stream.write(1)).toThrowError(Error); + }); + + it('should handle close requests', async () => { + const stream = new WriteStreamImpl(noopCb, noopCb); + + expect(stream.isCloseRequested()).toBeFalsy(); + + const closeRequestP = stream.waitForCloseRequest(); + expect( + await Promise.race([ + new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)), + closeRequestP, + ]), + ).toEqual('timeout'); + + stream.triggerCloseRequest(); + expect(stream.isCloseRequested()).toBeTruthy(); + await expect(closeRequestP).resolves.toEqual(undefined); + }); +}); diff --git a/router/streams.ts b/router/streams.ts index 5178e6fd..601ecf94 100644 --- a/router/streams.ts +++ b/router/streams.ts @@ -103,11 +103,7 @@ export interface WriteStream { * `waitForCloseRequest` returns a promise that resolves when the reader requests * to close the stream. */ - waitForCloseRequest(): Promise; - /** - * `waitForClose` returns a promise that resolves when the stream is closed - */ - waitForClose(): Promise; + waitForCloseRequest(): Promise; /** * `isClosed` returns true if the stream was closed by the writer. */ @@ -130,7 +126,9 @@ export interface ReadWriteStream { /** * Internal implementation of a `ReadStream`. - * Has internal methods to pushed data to the stream and close it. + * This won't be exposed as an interface to river + * consumers directly, it has internal river methods + * to pushed data to the stream and close it. */ export class ReadStreamImpl implements ReadStream { /** @@ -180,7 +178,7 @@ export class ReadStreamImpl implements ReadStream { /** * Resolves nextPromise */ - private resolveNext: null | (() => void) = null; + private resolveNextPromise: null | (() => void) = null; constructor(closeRequestCallback: () => void) { this.closeRequestCallback = closeRequestCallback; @@ -223,13 +221,13 @@ export class ReadStreamImpl implements ReadStream { if (!this.nextPromise) { this.nextPromise = new Promise((resolve) => { - this.resolveNext = resolve; + this.resolveNextPromise = resolve; }); } await this.nextPromise; this.nextPromise = null; - this.resolveNext = null; + this.resolveNextPromise = null; } // Unfortunately we have to use non-null assertion here, because T can be undefined @@ -266,7 +264,7 @@ export class ReadStreamImpl implements ReadStream { this.didDrainDisposeValues = this.queue.length > 0; this.queue.length = 0; - this.resolveNext?.(); + this.resolveNextPromise?.(); } public isClosed(): boolean { @@ -310,7 +308,7 @@ export class ReadStreamImpl implements ReadStream { } this.queue.push(value); - this.resolveNext?.(); + this.resolveNextPromise?.(); } /** @@ -325,7 +323,7 @@ export class ReadStreamImpl implements ReadStream { } this.closed = true; - this.resolveNext?.(); + this.resolveNextPromise?.(); this.resolveClosePromise(); } @@ -336,3 +334,93 @@ export class ReadStreamImpl implements ReadStream { return this.queue.length > 0; } } + +/** + * Internal implementation of a `WriteStream`. + * This won't be exposed as an interface to river + * consumers directly, it has internal river methods + * to trigger a close request, a way to pass on close + * signals, and a way to push data to the stream. + */ +export class WriteStreamImpl implements WriteStream { + /** + * Passed via constructor to pass on write requests + */ + private writeCb: (value: T) => void; + /** + * Passed via constructor to pass on close requests + */ + private closeCb: () => void; + /** + * Whether the stream is closed. + */ + private closed = false; + /** + * Whether the reader has requested to close the stream. + */ + private closeRequested = false; + /** + * A promise that resolves when we recive a close request. + */ + private closeRequestPromise: Promise; + /** + * Resolves closePromise + */ + private resolveRequestClosePromise: () => void = () => undefined; + + constructor(writeCb: (value: T) => void, closeCb: () => void) { + this.writeCb = writeCb; + this.closeCb = closeCb; + + this.closeRequestPromise = new Promise((resolve) => { + this.resolveRequestClosePromise = () => resolve(undefined); + }); + } + + public write(value: T): undefined { + if (this.closed) { + throw new Error('Cannot write to closed stream'); + } + + this.writeCb(value); + } + + public close(): undefined { + if (this.closed) { + return; + } + + this.closed = true; + this.closeCb(); + } + + public isCloseRequested(): boolean { + return this.closeRequested; + } + + public waitForCloseRequest(): Promise { + return this.closeRequestPromise; + } + + public isClosed(): boolean { + return this.closed; + } + + /** + * @internal meant for use within river, not exposed as a public API + * + * Triggers a close request. + */ + public triggerCloseRequest(): undefined { + if (this.closeRequested) { + throw new Error('Cannot trigger close request multiple times'); + } + + if (this.closed) { + throw new Error('Cannot trigger close request on closed stream'); + } + + this.closeRequested = true; + this.resolveRequestClosePromise(); + } +}