Skip to content

Commit

Permalink
WriteStream implementation (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
masad-frost authored May 21, 2024
1 parent cad81fe commit cc48498
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 13 deletions.
53 changes: 52 additions & 1 deletion __tests__/streams.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { describe, it, expect, vi } from 'vitest';
import { InterruptedStreamError, ReadStreamImpl } from '../router/streams';
import {
InterruptedStreamError,
ReadStreamImpl,
WriteStreamImpl,
} from '../router/streams';

const noopCb = () => undefined;

Expand Down Expand Up @@ -334,3 +338,50 @@ describe('ReadStream unit', () => {
});
});
});

describe('WriteStream unit', () => {
it('should write', async () => {
const writeCb = vi.fn();
const stream = new WriteStreamImpl<number>(writeCb, noopCb);
stream.write(1);
stream.write(2);

expect(writeCb).toHaveBeenNthCalledWith(1, 1);
expect(writeCb).toHaveBeenNthCalledWith(2, 2);
});

it('should close the stream', async () => {
const closeCb = vi.fn();
const stream = new WriteStreamImpl<number>(noopCb, closeCb);

expect(stream.isClosed()).toBeFalsy();

stream.close();
expect(closeCb).toHaveBeenCalled();
expect(stream.isClosed()).toBeTruthy();
});

it('should throw when writing after close', async () => {
const stream = new WriteStreamImpl<number>(noopCb, noopCb);
stream.close();
expect(() => stream.write(1)).toThrowError(Error);
});

it('should handle close requests', async () => {
const stream = new WriteStreamImpl<number>(noopCb, noopCb);

expect(stream.isCloseRequested()).toBeFalsy();

const closeRequestP = stream.waitForCloseRequest();
expect(
await Promise.race([
new Promise((resolve) => setTimeout(() => resolve('timeout'), 10)),
closeRequestP,
]),
).toEqual('timeout');

stream.triggerCloseRequest();
expect(stream.isCloseRequested()).toBeTruthy();
await expect(closeRequestP).resolves.toEqual(undefined);
});
});
112 changes: 100 additions & 12 deletions router/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,7 @@ export interface WriteStream<T> {
* `waitForCloseRequest` returns a promise that resolves when the reader requests
* to close the stream.
*/
waitForCloseRequest(): Promise<void>;
/**
* `waitForClose` returns a promise that resolves when the stream is closed
*/
waitForClose(): Promise<void>;
waitForCloseRequest(): Promise<undefined>;
/**
* `isClosed` returns true if the stream was closed by the writer.
*/
Expand All @@ -130,7 +126,9 @@ export interface ReadWriteStream<TRead, TWrite> {

/**
* Internal implementation of a `ReadStream`.
* Has internal methods to pushed data to the stream and close it.
* This won't be exposed as an interface to river
* consumers directly, it has internal river methods
* to pushed data to the stream and close it.
*/
export class ReadStreamImpl<T> implements ReadStream<T> {
/**
Expand Down Expand Up @@ -180,7 +178,7 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
/**
* Resolves nextPromise
*/
private resolveNext: null | (() => void) = null;
private resolveNextPromise: null | (() => void) = null;

constructor(closeRequestCallback: () => void) {
this.closeRequestCallback = closeRequestCallback;
Expand Down Expand Up @@ -223,13 +221,13 @@ export class ReadStreamImpl<T> implements ReadStream<T> {

if (!this.nextPromise) {
this.nextPromise = new Promise<void>((resolve) => {
this.resolveNext = resolve;
this.resolveNextPromise = resolve;
});
}

await this.nextPromise;
this.nextPromise = null;
this.resolveNext = null;
this.resolveNextPromise = null;
}

// Unfortunately we have to use non-null assertion here, because T can be undefined
Expand Down Expand Up @@ -266,7 +264,7 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
this.didDrainDisposeValues = this.queue.length > 0;
this.queue.length = 0;

this.resolveNext?.();
this.resolveNextPromise?.();
}

public isClosed(): boolean {
Expand Down Expand Up @@ -310,7 +308,7 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
}

this.queue.push(value);
this.resolveNext?.();
this.resolveNextPromise?.();
}

/**
Expand All @@ -325,7 +323,7 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
}

this.closed = true;
this.resolveNext?.();
this.resolveNextPromise?.();
this.resolveClosePromise();
}

Expand All @@ -336,3 +334,93 @@ export class ReadStreamImpl<T> implements ReadStream<T> {
return this.queue.length > 0;
}
}

/**
* Internal implementation of a `WriteStream`.
* This won't be exposed as an interface to river
* consumers directly, it has internal river methods
* to trigger a close request, a way to pass on close
* signals, and a way to push data to the stream.
*/
export class WriteStreamImpl<T> implements WriteStream<T> {
/**
* Passed via constructor to pass on write requests
*/
private writeCb: (value: T) => void;
/**
* Passed via constructor to pass on close requests
*/
private closeCb: () => void;
/**
* Whether the stream is closed.
*/
private closed = false;
/**
* Whether the reader has requested to close the stream.
*/
private closeRequested = false;
/**
* A promise that resolves when we recive a close request.
*/
private closeRequestPromise: Promise<undefined>;
/**
* Resolves closePromise
*/
private resolveRequestClosePromise: () => void = () => undefined;

constructor(writeCb: (value: T) => void, closeCb: () => void) {
this.writeCb = writeCb;
this.closeCb = closeCb;

this.closeRequestPromise = new Promise((resolve) => {
this.resolveRequestClosePromise = () => resolve(undefined);
});
}

public write(value: T): undefined {
if (this.closed) {
throw new Error('Cannot write to closed stream');
}

this.writeCb(value);
}

public close(): undefined {
if (this.closed) {
return;
}

this.closed = true;
this.closeCb();
}

public isCloseRequested(): boolean {
return this.closeRequested;
}

public waitForCloseRequest(): Promise<undefined> {
return this.closeRequestPromise;
}

public isClosed(): boolean {
return this.closed;
}

/**
* @internal meant for use within river, not exposed as a public API
*
* Triggers a close request.
*/
public triggerCloseRequest(): undefined {
if (this.closeRequested) {
throw new Error('Cannot trigger close request multiple times');
}

if (this.closed) {
throw new Error('Cannot trigger close request on closed stream');
}

this.closeRequested = true;
this.resolveRequestClosePromise();
}
}

0 comments on commit cc48498

Please sign in to comment.