Skip to content

Commit

Permalink
feat: subscriptions (server-streams) + observable (#22)
Browse files Browse the repository at this point in the history
* subscriptions v0.8.0

* fix lint errors

* add datatypes export to package json

* make observable a fixture instead of a top-level thing

* fix package.json, return on stream done rather than explicitly checking value
  • Loading branch information
jackyzha0 authored Dec 7, 2023
1 parent b0a99c8 commit b28dd9d
Show file tree
Hide file tree
Showing 15 changed files with 463 additions and 94 deletions.
10 changes: 5 additions & 5 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import {
createWebSocketServer,
createWsTransports,
onServerReady,
} from '../testUtils';
} from '../util/testHelpers';
import largePayload from './largePayload.json';
import { TestServiceConstructor } from './fixtures';
import { TestServiceConstructor } from './fixtures/services';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { StupidlyLargeService } from './typescript-stress.test';
Expand Down Expand Up @@ -84,13 +84,13 @@ describe('simple router level bandwidth', async () => {
bench(
'rpc (wait for response)',
async () => {
const result = await client.test.add({ n: 1 });
const result = await client.test.add.rpc({ n: 1 });
assert(result.ok);
},
{ time: BENCH_DURATION },
);

const [input, output] = await client.test.echo();
const [input, output] = await client.test.echo.stream();
bench(
'stream (wait for response)',
async () => {
Expand Down Expand Up @@ -131,7 +131,7 @@ describe('complex (50 procedures) router level bandwidth', async () => {
bench(
'rpc (wait for response)',
async () => {
const result = await client.b.f35({ a: 1 });
const result = await client.b.f35.rpc({ a: 1 });
assert(result.ok);
},
{ time: BENCH_DURATION },
Expand Down
101 changes: 82 additions & 19 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { afterAll, assert, describe, expect, test } from 'vitest';
import {
createLocalWebSocketClient,
createWebSocketServer,
createWsTransports,
iterNext,
onServerReady,
} from '../testUtils';
} from '../util/testHelpers';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import http from 'http';
Expand All @@ -13,10 +15,13 @@ import {
FallibleServiceConstructor,
OrderingServiceConstructor,
STREAM_ERROR,
SubscribableServiceConstructor,
TestServiceConstructor,
} from './fixtures';
} from './fixtures/services';
import { UNCAUGHT_ERROR } from '../router/result';
import { codecs } from '../codec/codec.test';
import { WebSocketClientTransport } from '../transport/impls/ws/client';
import { WebSocketServerTransport } from '../transport/impls/ws/server';

describe.each(codecs)(
'client <-> server integration test ($name codec)',
Expand All @@ -39,7 +44,7 @@ describe.each(codecs)(
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.add({ n: 3 });
const result = await client.test.add.rpc({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
});
Expand All @@ -49,10 +54,10 @@ describe.each(codecs)(
const serviceDefs = { test: FallibleServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.divide({ a: 10, b: 2 });
const result = await client.test.divide.rpc({ a: 10, b: 2 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 5 });
const result2 = await client.test.divide({ a: 10, b: 0 });
const result2 = await client.test.divide.rpc({ a: 10, b: 0 });
assert(!result2.ok);
expect(result2.payload).toStrictEqual({
code: DIV_BY_ZERO,
Expand All @@ -68,7 +73,7 @@ describe.each(codecs)(
const serviceDefs = { test: BinaryFileServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.getFile({ file: 'test.py' });
const result = await client.test.getFile.rpc({ file: 'test.py' });
assert(result.ok);
assert(result.payload.contents instanceof Uint8Array);
expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual(
Expand All @@ -82,17 +87,17 @@ describe.each(codecs)(
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [input, output, close] = await client.test.echo();
const [input, output, close] = await client.test.echo.stream();
input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();

const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });

const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'ghi' });

Expand All @@ -105,19 +110,19 @@ describe.each(codecs)(
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [input, output, close] = await client.test.echo();
const [input, output, close] = await client.test.echo.stream();
input.push({ msg: 'abc', throwResult: false, throwError: false });
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1 && result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });

input.push({ msg: 'def', throwResult: true, throwError: false });
const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2 && !result2.ok);
expect(result2.payload.code).toStrictEqual(STREAM_ERROR);

input.push({ msg: 'ghi', throwResult: false, throwError: true });
const result3 = await output.next().then((res) => res.value);
const result3 = await iterNext(output);
assert(result3 && !result3.ok);
expect(result3.payload).toStrictEqual({
code: UNCAUGHT_ERROR,
Expand All @@ -126,6 +131,64 @@ describe.each(codecs)(
close();
});

test('subscription', async () => {
const options = { codec };
const serverTransport = new WebSocketServerTransport(
webSocketServer,
'SERVER',
options,
);
const client1Transport = new WebSocketClientTransport(
() => createLocalWebSocketClient(port),
'client1',
'SERVER',
options,
);
const client2Transport = new WebSocketClientTransport(
() => createLocalWebSocketClient(port),
'client2',
'SERVER',
options,
);

const serviceDefs = { test: SubscribableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client1 = createClient<typeof server>(client1Transport);
const client2 = createClient<typeof server>(client2Transport);
const [subscription1, close1] = await client1.test.value.subscribe({});
let result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const [subscription2, close2] = await client2.test.value.subscribe({});
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const add1 = await client1.test.add.rpc({ n: 1 });
assert(add1.ok);

result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });

const add2 = await client2.test.add.rpc({ n: 3 });
assert(add2.ok);

result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 4 });
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 4 });

close1();
close2();
});

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
Expand All @@ -144,12 +207,12 @@ describe.each(codecs)(
clientTransport.connections.forEach((conn) => conn.ws.terminate());
}

await client.test.add({
await client.test.add.rpc({
n: i,
});
}

const res = await client.test.getAll({});
const res = await client.test.getAll.rpc({});
assert(res.ok);
return expect(res.payload.msgs).toStrictEqual(expected);
});
Expand All @@ -163,7 +226,7 @@ describe.each(codecs)(

const promises = [];
for (let i = 0; i < CONCURRENCY; i++) {
promises.push(client.test.add({ n: i }));
promises.push(client.test.add.rpc({ n: i }));
}

for (let i = 0; i < CONCURRENCY; i++) {
Expand All @@ -181,7 +244,7 @@ describe.each(codecs)(

const openStreams = [];
for (let i = 0; i < CONCURRENCY; i++) {
const streamHandle = await client.test.echo();
const streamHandle = await client.test.echo.stream();
const input = streamHandle[0];
input.push({ msg: `${i}-1`, ignore: false });
input.push({ msg: `${i}-2`, ignore: false });
Expand All @@ -190,11 +253,11 @@ describe.each(codecs)(

for (let i = 0; i < CONCURRENCY; i++) {
const output = openStreams[i][1];
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: `${i}-1` });

const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: `${i}-2` });
}
Expand Down
50 changes: 50 additions & 0 deletions __tests__/fixtures/observable.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Observable } from './observable';
import { describe, expect, test, vitest } from 'vitest';

describe('Observable', () => {
test('should set initial value correctly', () => {
const initialValue = 10;
const observable = new Observable(initialValue);
expect(observable.value).toBe(initialValue);
});

test('should update value correctly', () => {
const observable = new Observable(10);
const newValue = 20;
observable.set(() => newValue);
expect(observable.value).toBe(newValue);
});

test('should notify listeners when value changes', () => {
const observable = new Observable(10);
const listener = vitest.fn();
observable.observe(listener);
expect(listener).toHaveBeenCalledTimes(1);

const newValue = 20;
observable.set(() => newValue);

expect(listener).toHaveBeenCalledTimes(2);
expect(listener).toHaveBeenCalledWith(newValue);
});

test('should unsubscribe from notifications', () => {
const observable = new Observable(10);
const listener = vitest.fn();
const unsubscribe = observable.observe(listener);
expect(listener).toHaveBeenCalledTimes(1);

const newValue = 20;
observable.set(() => newValue);

expect(listener).toHaveBeenCalledTimes(2);
expect(listener).toHaveBeenCalledWith(newValue);

unsubscribe();

const anotherValue = 30;
observable.set(() => anotherValue);

expect(listener).toHaveBeenCalledTimes(2); // should not be called again after unsubscribing
});
});
42 changes: 42 additions & 0 deletions __tests__/fixtures/observable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Represents an observable value that can be subscribed to for changes.
* This should only be used in tests
* @template T - The type of the value being observed.
*/
export class Observable<T> {
value: T;
private listeners: Set<(val: T) => void>;

constructor(initialValue: T) {
this.value = initialValue;
this.listeners = new Set();
}

/**
* Gets the current value of the observable.
*/
get() {
return this.value;
}

/**
* Sets the current value of the observable. All listeners will get an update with this value.
* @param newValue - The new value to set.
*/
set(tx: (preValue: T) => T) {
const newValue = tx(this.value);
this.value = newValue;
this.listeners.forEach((listener) => listener(newValue));
}

/**
* Subscribes to changes in the observable value.
* @param listener - A callback function that will be called when the value changes.
* @returns A function that can be called to unsubscribe from further notifications.
*/
observe(listener: (val: T) => void) {
this.listeners.add(listener);
listener(this.get());
return () => this.listeners.delete(listener);
}
}
36 changes: 33 additions & 3 deletions __tests__/fixtures.ts → __tests__/fixtures/services.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Type } from '@sinclair/typebox';
import { ServiceBuilder } from '../router/builder';
import { reply } from '../transport/message';
import { Err, Ok } from '../router/result';
import { ServiceBuilder } from '../../router/builder';
import { reply } from '../../transport/message';
import { Err, Ok } from '../../router/result';
import { Observable } from './observable';

export const EchoRequest = Type.Object({
msg: Type.String(),
Expand Down Expand Up @@ -152,3 +153,32 @@ export const FallibleServiceConstructor = () =>
},
})
.finalize();

export const SubscribableServiceConstructor = () =>
ServiceBuilder.create('subscribable')
.initialState({
count: new Observable<number>(0),
})
.defineProcedure('add', {
type: 'rpc',
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
ctx.state.count.set((prev) => prev + n);
return reply(msg, Ok({ result: ctx.state.count.get() }));
},
})
.defineProcedure('value', {
type: 'subscription',
input: Type.Object({}),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg, returnStream) {
ctx.state.count.observe((count) => {
returnStream.push(reply(msg, Ok({ result: count })));
});
},
})
.finalize();
Loading

0 comments on commit b28dd9d

Please sign in to comment.