Skip to content

Commit

Permalink
feat: handle reconnects in WS Transport, benchmarks (#8)
Browse files Browse the repository at this point in the history
* async ws constructor, bump to 0.2.0

* benchmarks

* add bench to ci

* make reporter verbose

* jk lol it doesnt report correctly

* address id ! check

* await promise

* refactor ws transport check

* use addEventListener variants of onxxx fns

* remove extra todo

* address review comments

* refactor out event loop

* update bench duration

* run all

* more comments, address nits

* throw error instead

* reuse reconnection promise

* auto port allocation

* make createWsTransports not async, basic test for reconnect logic

* add even more tests
  • Loading branch information
jackyzha0 authored Oct 18, 2023
1 parent 9aa64fa commit 10a3771
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 141 deletions.
132 changes: 132 additions & 0 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import http from 'http';
import { bench, describe } from 'vitest';
import {
createWebSocketServer,
createWsTransports,
onServerReady,
waitForMessage,
} from '../transport/util';
import largePayload from './largePayload.json';
import { TestServiceConstructor } from './integration.test';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { StupidlyLargeService } from './typescript-stress.test';

let smallId = 0;
let largeId = 0;
const dummyPayloadSmall = () => ({
id: `${smallId++}`,
from: 'client',
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
payload: {
msg: 'cool',
},
});

const dummyPayloadLarge = () => ({
id: `${largeId++}`,
from: 'client',
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
payload: largePayload,
});

const BENCH_DURATION = 1000;
describe('transport level bandwidth', async () => {
const server = http.createServer();
const port = await onServerReady(server);
const webSocketServer = await createWebSocketServer(server);
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);

bench(
'send and recv (small payload)',
async () => {
const id = clientTransport.send(dummyPayloadSmall());
await waitForMessage(serverTransport, (msg) => msg.id === id);
return;
},
{ time: BENCH_DURATION },
);

bench(
'send and recv (large payload)',
async () => {
const id = clientTransport.send(dummyPayloadLarge());
await waitForMessage(serverTransport, (msg) => msg.id === id);
return;
},
{ time: BENCH_DURATION },
);
});

describe('simple router level bandwidth', async () => {
const httpServer = http.createServer();
const port = await onServerReady(httpServer);
const webSocketServer = await createWebSocketServer(httpServer);
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

bench(
'rpc (wait for response)',
async () => {
await client.test.add({ n: 1 });
},
{ time: BENCH_DURATION },
);

const [input, output] = await client.test.echo();
bench(
'stream (wait for response)',
async () => {
input.push({ msg: 'abc', ignore: false });
await output.next();
},
{ time: BENCH_DURATION },
);

bench(
'stream',
async () => {
input.push({ msg: 'abc', ignore: false });
},
{ time: BENCH_DURATION },
);
});

describe('complex (50 procedures) router level bandwidth', async () => {
const httpServer = http.createServer();
const port = await onServerReady(httpServer);
const webSocketServer = await createWebSocketServer(httpServer);
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = {
a: StupidlyLargeService(),
b: StupidlyLargeService(),
c: StupidlyLargeService(),
d: StupidlyLargeService(),
};

const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

bench(
'rpc (wait for response)',
async () => {
await client.b.f35({ a: 1 });
},
{ time: BENCH_DURATION },
);
});
156 changes: 112 additions & 44 deletions __tests__/integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import http from 'http';
import { WebSocketServer } from 'ws';
import { Type } from '@sinclair/typebox';
import { ServiceBuilder, serializeService } from '../router/builder';
import { reply } from '../transport/message';
import { afterAll, beforeAll, describe, expect, test } from 'vitest';
import { TransportMessage, reply } from '../transport/message';
import { afterAll, describe, expect, test } from 'vitest';
import {
createWebSocketServer,
createWsTransports,
Expand All @@ -12,6 +11,7 @@ import {
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { asClientRpc, asClientStream } from '../router/server.util';
import { nanoid } from 'nanoid';

export const EchoRequest = Type.Object({
msg: Type.String(),
Expand Down Expand Up @@ -49,6 +49,31 @@ export const TestServiceConstructor = () =>
})
.finalize();

const OrderingServiceConstructor = () =>
ServiceBuilder.create('test')
.initialState({
msgs: [] as number[],
})
.defineProcedure('add', {
type: 'rpc',
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ ok: Type.Boolean() }),
async handler(ctx, msg) {
const { n } = msg.payload;
ctx.state.msgs.push(n);
return reply(msg, { ok: true });
},
})
.defineProcedure('getAll', {
type: 'rpc',
input: Type.Object({}),
output: Type.Object({ msgs: Type.Array(Type.Number()) }),
async handler(ctx, msg) {
return reply(msg, { msgs: ctx.state.msgs });
},
})
.finalize();

test('serialize service to jsonschema', () => {
const service = TestServiceConstructor();
expect(serializeService(service)).toStrictEqual({
Expand Down Expand Up @@ -109,68 +134,111 @@ describe('server-side test', () => {
});

test('stream basic', async () => {
const [i, o] = asClientStream(initialState, service.procedures.echo);

i.push({ msg: 'abc', ignore: false });
i.push({ msg: 'def', ignore: true });
i.push({ msg: 'ghi', ignore: false });
i.end();

await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'abc',
});
await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'ghi',
});
expect(o.readableLength).toBe(0);
const [input, output] = asClientStream(
initialState,
service.procedures.echo,
);

input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();

await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'abc',
},
);
await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'ghi',
},
);
expect(output.readableLength).toBe(0);
});
});

const port = 4445;
describe('client <-> server integration test', () => {
describe('client <-> server integration test', async () => {
const server = http.createServer();
let wss: WebSocketServer;

beforeAll(async () => {
await onServerReady(server, port);
wss = await createWebSocketServer(server);
});
const port = await onServerReady(server);
const webSocketServer = await createWebSocketServer(server);

afterAll(() => {
wss.clients.forEach((socket) => {
webSocketServer.clients.forEach((socket) => {
socket.close();
});
server.close();
});

test('rpc', async () => {
const [ct, st] = await createWsTransports(port, wss);
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(st, serviceDefs);
const client = createClient<typeof server>(ct);
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
await expect(client.test.add({ n: 3 })).resolves.toStrictEqual({
result: 3,
});
});

test('stream', async () => {
const [ct, st] = await createWsTransports(port, wss);
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(st, serviceDefs);
const client = createClient<typeof server>(ct);

const [i, o, close] = await client.test.echo();
i.push({ msg: 'abc', ignore: false });
i.push({ msg: 'def', ignore: true });
i.push({ msg: 'ghi', ignore: false });
i.end();
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [input, output, close] = await client.test.echo();
input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();

await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'abc',
},
);
await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'ghi',
},
);

await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'abc',
});
await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'ghi',
});
close();
});

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: OrderingServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const expected: number[] = [];
for (let i = 0; i < 50; i++) {
expected.push(i);

if (i == 10) {
clientTransport.ws?.close();
}

if (i == 42) {
clientTransport.ws?.terminate();
}

await client.test.add({
n: i,
});
}

const res = await client.test.getAll({});
return expect(res.msgs).toStrictEqual(expected);
});
});
33 changes: 33 additions & 0 deletions __tests__/largePayload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"data": [
{
"type": "articles",
"id": "1",
"attributes": {
"title": "Example article",
"body": "The shortest article. Ever.",
"created": "2015-05-22T14:56:29.000Z",
"updated": "2015-05-22T14:56:28.000Z"
},
"relationships": {
"author": {
"data": {
"id": "42",
"type": "people"
}
}
}
}
],
"included": [
{
"type": "people",
"id": "42",
"attributes": {
"name": "John",
"age": 21,
"gender": "male"
}
}
]
}
Loading

0 comments on commit 10a3771

Please sign in to comment.