Skip to content

Commit

Permalink
benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 committed Oct 14, 2023
1 parent 6a9f5e1 commit a188b11
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 89 deletions.
110 changes: 110 additions & 0 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import http from 'http';
import { bench, describe } from 'vitest';
import {
createWebSocketServer,
createWsTransports,
onServerReady,
waitForMessage,
} from '../transport/util';
import largePayload from './largePayload.json';
import { TestServiceConstructor } from './integration.test';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { StupidlyLargeService } from './typescript-stress.test';

let smallId = 0;
let largeId = 0;
const dummyPayloadSmall = () => ({
id: `${smallId++}`,
from: 'client',
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
payload: {
msg: 'cool',
},
});

const dummyPayloadLarge = () => ({
id: `${largeId++}`,
from: 'client',
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
payload: largePayload,
});

describe('transport level bandwidth', async () => {
const port = 4444;
const server = http.createServer();
await onServerReady(server, port);
const webSocketServer = await createWebSocketServer(server);
const [clientTransport, serverTransport] = await createWsTransports(
port,
webSocketServer,
);

bench('send and recv (small payload)', async () => {
const id = clientTransport.send(dummyPayloadSmall());
await waitForMessage(serverTransport, (msg) => msg.id === id);
return;
});

bench('send and recv (large payload)', async () => {
const id = clientTransport.send(dummyPayloadLarge());
await waitForMessage(serverTransport, (msg) => msg.id === id);
return;
});
});

describe('simple router level bandwidth', async () => {
const port = 4445;
const httpServer = http.createServer();
await onServerReady(httpServer, port);
const webSocketServer = await createWebSocketServer(httpServer);
const [clientTransport, serverTransport] = await createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

bench('rpc (wait for response)', async () => {
await client.test.add({ n: 1 });
});

const [input, output] = await client.test.echo();
bench('stream (wait for response)', async () => {
input.push({ msg: 'abc', ignore: false });
await output.next();
});

bench('stream', async () => {
input.push({ msg: 'abc', ignore: false });
});
});

describe('complex (50 procedures) router level bandwidth', async () => {
const port = 4446;
const httpServer = http.createServer();
await onServerReady(httpServer, port);
const webSocketServer = await createWebSocketServer(httpServer);
const [clientTransport, serverTransport] = await createWsTransports(
port,
webSocketServer,
);
const serviceDefs = {
a: StupidlyLargeService(),
b: StupidlyLargeService(),
c: StupidlyLargeService(),
d: StupidlyLargeService(),
};

const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

bench('rpc (wait for response)', async () => {
await client.b.f35({ a: 1 });
});
});
89 changes: 53 additions & 36 deletions __tests__/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,68 +109,85 @@ describe('server-side test', () => {
});

test('stream basic', async () => {
const [i, o] = asClientStream(initialState, service.procedures.echo);

i.push({ msg: 'abc', ignore: false });
i.push({ msg: 'def', ignore: true });
i.push({ msg: 'ghi', ignore: false });
i.end();

await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'abc',
});
await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'ghi',
});
expect(o.readableLength).toBe(0);
const [input, output] = asClientStream(
initialState,
service.procedures.echo,
);

input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();

await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'abc',
},
);
await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'ghi',
},
);
expect(output.readableLength).toBe(0);
});
});

const port = 4445;
describe('client <-> server integration test', () => {
const server = http.createServer();
let wss: WebSocketServer;
let webSocketServer: WebSocketServer;

beforeAll(async () => {
await onServerReady(server, port);
wss = await createWebSocketServer(server);
webSocketServer = await createWebSocketServer(server);
});

afterAll(() => {
wss.clients.forEach((socket) => {
webSocketServer.clients.forEach((socket) => {
socket.close();
});
server.close();
});

test('rpc', async () => {
const [ct, st] = await createWsTransports(port, wss);
const [clientTransport, serverTransport] = await createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(st, serviceDefs);
const client = createClient<typeof server>(ct);
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
await expect(client.test.add({ n: 3 })).resolves.toStrictEqual({
result: 3,
});
});

test('stream', async () => {
const [ct, st] = await createWsTransports(port, wss);
const [clientTransport, serverTransport] = await createWsTransports(
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(st, serviceDefs);
const client = createClient<typeof server>(ct);

const [i, o, close] = await client.test.echo();
i.push({ msg: 'abc', ignore: false });
i.push({ msg: 'def', ignore: true });
i.push({ msg: 'ghi', ignore: false });
i.end();

await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'abc',
});
await expect(o.next().then((res) => res.value)).resolves.toStrictEqual({
response: 'ghi',
});
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [input, output, close] = await client.test.echo();
input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();

await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'abc',
},
);
await expect(output.next().then((res) => res.value)).resolves.toStrictEqual(
{
response: 'ghi',
},
);
close();
});
});
33 changes: 33 additions & 0 deletions __tests__/largePayload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"data": [
{
"type": "articles",
"id": "1",
"attributes": {
"title": "Example article",
"body": "The shortest article. Ever.",
"created": "2015-05-22T14:56:29.000Z",
"updated": "2015-05-22T14:56:28.000Z"
},
"relationships": {
"author": {
"data": {
"id": "42",
"type": "people"
}
}
}
}
],
"included": [
{
"type": "people",
"id": "42",
"attributes": {
"name": "John",
"age": 21,
"gender": "male"
}
}
]
}
24 changes: 13 additions & 11 deletions __tests__/typescript-stress.test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { describe, expect, test } from 'vitest';
import { Procedure, ServiceBuilder, serializeService } from '../router/builder';
import { TObject, Type } from '@sinclair/typebox';
import { Type } from '@sinclair/typebox';
import { MessageId, OpaqueTransportMessage, reply } from '../transport/message';
import { createServer } from '../router/server';
import { Transport } from '../transport/types';
import { NaiveJsonCodec } from '../codec/json';
import { createClient } from '../router/client';

const fnBody: Procedure<{}, 'rpc', TObject, TObject> = {
const input = Type.Object({ a: Type.Number() });
const output = Type.Object({ b: Type.Number() });
const fnBody: Procedure<{}, 'rpc', typeof input, typeof output> = {
type: 'rpc',
input: Type.Object({ a: Type.Number() }),
output: Type.Object({ b: Type.Number() }),
input,
output,
async handler(_state, msg) {
return reply(msg, { b: msg.payload.a });
},
};

// typescript is limited to max 50 constraints
// see: https://github.com/microsoft/TypeScript/issues/33541
const svc = () =>
export const StupidlyLargeService = () =>
ServiceBuilder.create('test')
.defineProcedure('f1', fnBody)
.defineProcedure('f2', fnBody)
Expand Down Expand Up @@ -77,7 +79,7 @@ export class MockTransport extends Transport {
super(NaiveJsonCodec, clientId);
}

async send(msg: OpaqueTransportMessage): Promise<MessageId> {
send(msg: OpaqueTransportMessage): MessageId {
const id = msg.id;
return id;
}
Expand All @@ -87,15 +89,15 @@ export class MockTransport extends Transport {

describe("ensure typescript doesn't give up trying to infer the types for large services", () => {
test('service with many procedures hits typescript limit', () => {
expect(serializeService(svc())).toBeTruthy();
expect(serializeService(StupidlyLargeService())).toBeTruthy();
});

test('serverclient should support many services with many procedures', async () => {
const listing = {
a: svc(),
b: svc(),
c: svc(),
d: svc(),
a: StupidlyLargeService(),
b: StupidlyLargeService(),
c: StupidlyLargeService(),
d: StupidlyLargeService(),
};
const server = await createServer(new MockTransport('SERVER'), listing);
const client = createClient<typeof server>(new MockTransport('client'));
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"build": "tsc",
"prepack": "npm run build",
"release": "npm publish --access public",
"test": "vitest"
"test": "vitest",
"bench": "vitest bench"
},
"engines": {
"node": ">=16"
Expand Down
4 changes: 2 additions & 2 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
// this gets cleaned up on i.end() which is called by closeHandler
(async () => {
for await (const rawIn of i) {
await transport.send(
transport.send(
msg(
transport.clientId,
'SERVER',
Expand Down Expand Up @@ -103,7 +103,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
return [i, o, closeHandler];
} else {
// rpc case
const id = await transport.send(
const id = transport.send(
msg(
transport.clientId,
'SERVER',
Expand Down
5 changes: 3 additions & 2 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export async function createServer<Services extends Record<string, AnyService>>(
// sending outgoing messages back to client
(async () => {
for await (const response of outgoing) {
await transport.send(response);
transport.send(response);
}
})(),
]),
Expand Down Expand Up @@ -93,6 +93,7 @@ export async function createServer<Services extends Record<string, AnyService>>(
const inputMessage = msg as TransportMessage<
(typeof procedure)['input']
>;

if (
procedure.type === 'rpc' &&
Value.Check(procedure.input, inputMessage.payload)
Expand All @@ -101,7 +102,7 @@ export async function createServer<Services extends Record<string, AnyService>>(
getContext(service),
inputMessage,
);
await transport.send(response);
transport.send(response);
return;
} else if (
procedure.type === 'stream' &&
Expand Down
2 changes: 1 addition & 1 deletion transport/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('sending and receiving across node streams works', () => {
};

const p = waitForMessage(serverTransport);
await clientTransport.send({
clientTransport.send({
id: '1',
from: 'client',
to: 'SERVER',
Expand Down
Loading

0 comments on commit a188b11

Please sign in to comment.