Skip to content

Commit

Permalink
[WS-2007] feat: connection cleanup handlers (#35)
Browse files Browse the repository at this point in the history
* refactor into various helpers

* rewrite server into a class for readability reasons

* server and client cleanup hooks

* client side disconenct tests wip

* fix race in trrest

* fix rpc disconnect test

* remove unused import

* change test to be transport level

* add server cleanup on client silent drop invariant test

* base client refactor

* add disconnect tests

* tests for upload and subscribe

* version bump

* [WS-2011] Typecheck router service definitions (#36)

* type safety in service def building

* [WS-2010] ergonomics: `Transport<T>` -> `T` (#37)
  • Loading branch information
jackyzha0 authored Dec 15, 2023
1 parent 0c7a362 commit babbeb8
Show file tree
Hide file tree
Showing 23 changed files with 1,293 additions and 931 deletions.
21 changes: 11 additions & 10 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import {
createWebSocketServer,
createWsTransports,
onServerReady,
waitForMessage,
} from '../util/testHelpers';
import largePayload from './fixtures/largePayload.json';
import { TestServiceConstructor } from './fixtures/services';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { StupidlyLargeService } from './typescript-stress.test';
import { waitForMessage } from '../transport';
import { buildServiceDefs } from '../router/defs';

let smallId = 0;
let largeId = 0;
Expand Down Expand Up @@ -77,8 +78,8 @@ describe('simple router level bandwidth', async () => {
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

bench(
Expand Down Expand Up @@ -118,14 +119,14 @@ describe('complex (50 procedures) router level bandwidth', async () => {
port,
webSocketServer,
);
const serviceDefs = {
a: StupidlyLargeService(),
b: StupidlyLargeService(),
c: StupidlyLargeService(),
d: StupidlyLargeService(),
};
const serviceDefs = buildServiceDefs([
StupidlyLargeService('a'),
StupidlyLargeService('b'),
StupidlyLargeService('c'),
StupidlyLargeService('d'),
]);

const server = await createServer(serverTransport, serviceDefs);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

bench(
Expand Down
95 changes: 68 additions & 27 deletions __tests__/invariants.test.ts → __tests__/cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import {
import {
SubscribableServiceConstructor,
TestServiceConstructor,
UploadableServiceConstructor,
} from './fixtures/services';
import { createClient, createServer } from '../router';
import {
ensureServerIsClean,
ensureTransportQueuesAreEventuallyEmpty,
waitUntil,
waitFor,
} from './fixtures/cleanup';
import { buildServiceDefs } from '../router/defs';

describe('procedures should leave no trace after finishing', async () => {
const httpServer = http.createServer();
Expand All @@ -30,8 +32,8 @@ describe('procedures should leave no trace after finishing', async () => {

test('closing a transport from the client cleans up connection on the server', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

expect(clientTransport.connections.size).toEqual(0);
Expand All @@ -47,17 +49,18 @@ describe('procedures should leave no trace after finishing', async () => {
// should be back to 0 connections after client closes
clientTransport.close();
expect(clientTransport.connections.size).toEqual(0);
await waitUntil(
() => serverTransport.connections.size,
0,
'server should cleanup connection after client closes',
await waitFor(() =>
expect(
serverTransport.connections.size,
'server should cleanup connection after client closes',
).toEqual(0),
);
});

test('closing a transport from the server cleans up connection on the client', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

expect(clientTransport.connections.size).toEqual(0);
Expand All @@ -73,17 +76,18 @@ describe('procedures should leave no trace after finishing', async () => {
// should be back to 0 connections after client closes
serverTransport.close();
expect(serverTransport.connections.size).toEqual(0);
await waitUntil(
() => clientTransport.connections.size,
0,
'client should cleanup connection after server closes',
await waitFor(() =>
expect(
clientTransport.connections.size,
'client should cleanup connection after server closes',
).toEqual(0),
);
});

test('rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners =
Expand Down Expand Up @@ -115,8 +119,8 @@ describe('procedures should leave no trace after finishing', async () => {

test('stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners =
Expand All @@ -134,10 +138,10 @@ describe('procedures should leave no trace after finishing', async () => {
expect(result1.payload).toStrictEqual({ response: '1' });

// ensure we only have one stream despite pushing multiple messages.
await waitUntil(() => server.streams.size, 1);
await waitFor(() => expect(server.streams.size).toEqual(1));
input.end();
// ensure we no longer have any streams since the input was closed.
await waitUntil(() => server.streams.size, 0);
await waitFor(() => expect(server.streams.size).toEqual(0));

const result2 = await iterNext(output);
assert(result2.ok);
Expand Down Expand Up @@ -169,8 +173,8 @@ describe('procedures should leave no trace after finishing', async () => {

test('subscription', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: SubscribableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners =
Expand All @@ -179,22 +183,59 @@ describe('procedures should leave no trace after finishing', async () => {
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [subscription, close] = await client.test.value.subscribe({});
const [subscription, close] = await client.subscribable.value.subscribe({});
let result = await iterNext(subscription);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const add1 = await client.test.add.rpc({ n: 1 });
const add1 = await client.subscribable.add.rpc({ n: 1 });
assert(add1.ok);

result = await iterNext(subscription);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });

close();
// end procedure

// number of message handlers shouldn't increase after stream ends
// number of message handlers shouldn't increase after subscription ends
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
expect(
clientTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(clientListeners);

// check number of connections
expect(serverTransport.connections.size).toEqual(1);
expect(clientTransport.connections.size).toEqual(1);
await ensureTransportQueuesAreEventuallyEmpty(clientTransport);
await ensureTransportQueuesAreEventuallyEmpty(serverTransport);

// ensure we have no streams left on the server
await ensureServerIsClean(server);
});

test('upload', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners =
serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners =
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [addStream, addResult] = await client.uploadable.addMultiple.upload();
addStream.push({ n: 1 });
addStream.push({ n: 2 });
addStream.end();

const result = await addResult;
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
// end procedure

// number of message handlers shouldn't increase after upload ends
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
Expand Down
Loading

0 comments on commit babbeb8

Please sign in to comment.