Skip to content

Commit

Permalink
[WS-2011] Typecheck router service definitions (#36)
Browse files Browse the repository at this point in the history
* type safety in service def building

* [WS-2010] ergonomics: `Transport<T>` -> `T` (#37)
  • Loading branch information
jackyzha0 authored Dec 15, 2023
1 parent e2353b9 commit 1835e72
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 567 deletions.
15 changes: 8 additions & 7 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { TestServiceConstructor } from './fixtures/services';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { StupidlyLargeService } from './typescript-stress.test';
import { buildServiceDefs } from '../router/defs';

let smallId = 0;
let largeId = 0;
Expand Down Expand Up @@ -77,7 +78,7 @@ describe('simple router level bandwidth', async () => {
port,
webSocketServer,
);
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -118,12 +119,12 @@ describe('complex (50 procedures) router level bandwidth', async () => {
port,
webSocketServer,
);
const serviceDefs = {
a: StupidlyLargeService(),
b: StupidlyLargeService(),
c: StupidlyLargeService(),
d: StupidlyLargeService(),
};
const serviceDefs = buildServiceDefs([
StupidlyLargeService('a'),
StupidlyLargeService('b'),
StupidlyLargeService('c'),
StupidlyLargeService('d'),
]);

const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
Expand Down
17 changes: 9 additions & 8 deletions __tests__/cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ensureTransportQueuesAreEventuallyEmpty,
waitFor,
} from './fixtures/cleanup';
import { buildServiceDefs } from '../router/defs';

describe('procedures should leave no trace after finishing', async () => {
const httpServer = http.createServer();
Expand All @@ -31,7 +32,7 @@ describe('procedures should leave no trace after finishing', async () => {

test('closing a transport from the client cleans up connection on the server', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand All @@ -58,7 +59,7 @@ describe('procedures should leave no trace after finishing', async () => {

test('closing a transport from the server cleans up connection on the client', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand All @@ -85,7 +86,7 @@ describe('procedures should leave no trace after finishing', async () => {

test('rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -118,7 +119,7 @@ describe('procedures should leave no trace after finishing', async () => {

test('stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -172,7 +173,7 @@ describe('procedures should leave no trace after finishing', async () => {

test('subscription', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: SubscribableServiceConstructor() };
const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand All @@ -182,11 +183,11 @@ describe('procedures should leave no trace after finishing', async () => {
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [subscription, close] = await client.test.value.subscribe({});
const [subscription, close] = await client.subscribable.value.subscribe({});
let result = await iterNext(subscription);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });
const add1 = await client.test.add.rpc({ n: 1 });
const add1 = await client.subscribable.add.rpc({ n: 1 });
assert(add1.ok);
result = await iterNext(subscription);
assert(result.ok);
Expand Down Expand Up @@ -214,7 +215,7 @@ describe('procedures should leave no trace after finishing', async () => {

test('upload', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { uploadable: UploadableServiceConstructor() };
const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down
34 changes: 19 additions & 15 deletions __tests__/disconnects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { CONNECTION_GRACE_PERIOD_MS } from '../router/client';
import { Err, UNEXPECTED_DISCONNECT } from '../router/result';
import { WebSocketServerTransport } from '../transport/impls/ws/server';
import { WebSocketClientTransport } from '../transport/impls/ws/client';
import { buildServiceDefs } from '../router/defs';

describe('procedures should handle unexpected disconnects', async () => {
const httpServer = http.createServer();
Expand All @@ -49,13 +50,12 @@ describe('procedures should handle unexpected disconnects', async () => {

test('rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

// start procedure
await client.test.add.rpc({ n: 3 });

expect(clientTransport.connections.size).toEqual(1);
expect(serverTransport.connections.size).toEqual(1);

Expand All @@ -75,14 +75,14 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});

test('stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -111,8 +111,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});

Expand All @@ -132,25 +132,29 @@ describe('procedures should handle unexpected disconnects', async () => {
'SERVER',
);

const serviceDefs = { test: SubscribableServiceConstructor() };
const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client1 = createClient<typeof server>(client1Transport);
const client2 = createClient<typeof server>(client2Transport);

// start procedure
// client1 and client2 both subscribe
const [subscription1, close1] = await client1.test.value.subscribe({});
const [subscription1, close1] = await client1.subscribable.value.subscribe(
{},
);
let result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const [subscription2, _close2] = await client2.test.value.subscribe({});
const [subscription2, _close2] = await client2.subscribable.value.subscribe(
{},
);
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

// client2 adds a value
const add1 = await client2.test.add.rpc({ n: 1 });
const add1 = await client2.subscribable.add.rpc({ n: 1 });
assert(add1.ok);

// both clients should receive the updated value
Expand All @@ -171,7 +175,7 @@ describe('procedures should handle unexpected disconnects', async () => {
client2Transport.tryReconnecting = false;

// client1 who is still connected can still add values and receive updates
const add2Promise = client1.test.add.rpc({ n: 2 });
const add2Promise = client1.subscribable.add.rpc({ n: 2 });

// after we've disconnected, hit end of grace period
await vi.runOnlyPendingTimersAsync();
Expand Down Expand Up @@ -205,7 +209,7 @@ describe('procedures should handle unexpected disconnects', async () => {

test('upload', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { uploadable: UploadableServiceConstructor() };
const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -233,8 +237,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});
});
43 changes: 23 additions & 20 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { codecs } from '../codec/codec.test';
import { WebSocketClientTransport } from '../transport/impls/ws/client';
import { WebSocketServerTransport } from '../transport/impls/ws/server';
import { testFinishesCleanly } from './fixtures/cleanup';
import { buildServiceDefs } from '../router/defs';

describe.each(codecs)(
'client <-> server integration test ($name codec)',
Expand All @@ -41,7 +42,7 @@ describe.each(codecs)(

test('rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.add.rpc({ n: 3 });
Expand All @@ -57,13 +58,13 @@ describe.each(codecs)(

test('fallible rpc', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: FallibleServiceConstructor() };
const serviceDefs = buildServiceDefs([FallibleServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.divide.rpc({ a: 10, b: 2 });
const result = await client.fallible.divide.rpc({ a: 10, b: 2 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 5 });
const result2 = await client.test.divide.rpc({ a: 10, b: 0 });
const result2 = await client.fallible.divide.rpc({ a: 10, b: 0 });
assert(!result2.ok);
expect(result2.payload).toStrictEqual({
code: DIV_BY_ZERO,
Expand All @@ -82,10 +83,10 @@ describe.each(codecs)(

test('rpc with binary (uint8array)', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: BinaryFileServiceConstructor() };
const serviceDefs = buildServiceDefs([BinaryFileServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);
const result = await client.test.getFile.rpc({ file: 'test.py' });
const result = await client.bin.getFile.rpc({ file: 'test.py' });
assert(result.ok);
assert(result.payload.contents instanceof Uint8Array);
expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual(
Expand All @@ -101,7 +102,7 @@ describe.each(codecs)(

test('stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -138,7 +139,7 @@ describe.each(codecs)(

test('stream with init message', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -168,11 +169,11 @@ describe.each(codecs)(

test('fallible stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: FallibleServiceConstructor() };
const serviceDefs = buildServiceDefs([FallibleServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [input, output, close] = await client.test.echo.stream();
const [input, output, close] = await client.fallible.echo.stream();
input.push({ msg: 'abc', throwResult: false, throwError: false });
const result1 = await iterNext(output);
assert(result1 && result1.ok);
Expand Down Expand Up @@ -219,21 +220,23 @@ describe.each(codecs)(
options,
);

const serviceDefs = { test: SubscribableServiceConstructor() };
const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client1 = createClient<typeof server>(client1Transport);
const client2 = createClient<typeof server>(client2Transport);
const [subscription1, close1] = await client1.test.value.subscribe({});
const [subscription1, close1] =
await client1.subscribable.value.subscribe({});
let result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const [subscription2, close2] = await client2.test.value.subscribe({});
const [subscription2, close2] =
await client2.subscribable.value.subscribe({});
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const add1 = await client1.test.add.rpc({ n: 1 });
const add1 = await client1.subscribable.add.rpc({ n: 1 });
assert(add1.ok);

result = await iterNext(subscription1);
Expand All @@ -243,7 +246,7 @@ describe.each(codecs)(
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });

const add2 = await client2.test.add.rpc({ n: 3 });
const add2 = await client2.subscribable.add.rpc({ n: 3 });
assert(add2.ok);

result = await iterNext(subscription1);
Expand All @@ -265,7 +268,7 @@ describe.each(codecs)(

test('upload', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { uploadable: UploadableServiceConstructor() };
const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand All @@ -286,7 +289,7 @@ describe.each(codecs)(

test('upload with init message', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { uploadable: UploadableServiceConstructor() };
const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand All @@ -309,7 +312,7 @@ describe.each(codecs)(

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
const serviceDefs = buildServiceDefs([OrderingServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down Expand Up @@ -344,7 +347,7 @@ describe.each(codecs)(
const CONCURRENCY = 10;
test('concurrent rpcs', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
const serviceDefs = buildServiceDefs([OrderingServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand All @@ -368,7 +371,7 @@ describe.each(codecs)(

test('concurrent streams', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const serviceDefs = buildServiceDefs([TestServiceConstructor()]);
const server = createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

Expand Down
Loading

0 comments on commit 1835e72

Please sign in to comment.