diff --git a/__tests__/bandwidth.bench.ts b/__tests__/bandwidth.bench.ts index 261d4935..e3094fe8 100644 --- a/__tests__/bandwidth.bench.ts +++ b/__tests__/bandwidth.bench.ts @@ -11,6 +11,7 @@ import { TestServiceConstructor } from './fixtures/services'; import { createServer } from '../router/server'; import { createClient } from '../router/client'; import { StupidlyLargeService } from './typescript-stress.test'; +import { buildServiceDefs } from '../router/defs'; let smallId = 0; let largeId = 0; @@ -77,7 +78,7 @@ describe('simple router level bandwidth', async () => { port, webSocketServer, ); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -118,12 +119,12 @@ describe('complex (50 procedures) router level bandwidth', async () => { port, webSocketServer, ); - const serviceDefs = { - a: StupidlyLargeService(), - b: StupidlyLargeService(), - c: StupidlyLargeService(), - d: StupidlyLargeService(), - }; + const serviceDefs = buildServiceDefs([ + StupidlyLargeService('a'), + StupidlyLargeService('b'), + StupidlyLargeService('c'), + StupidlyLargeService('d'), + ]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/cleanup.test.ts b/__tests__/cleanup.test.ts index 5be4490d..423df7b8 100644 --- a/__tests__/cleanup.test.ts +++ b/__tests__/cleanup.test.ts @@ -17,6 +17,7 @@ import { ensureTransportQueuesAreEventuallyEmpty, waitFor, } from './fixtures/cleanup'; +import { buildServiceDefs } from '../router/defs'; describe('procedures should leave no trace after finishing', async () => { const httpServer = http.createServer(); @@ -31,7 +32,7 @@ describe('procedures should leave no trace after finishing', async () => { test('closing a transport from the client cleans up connection on the server', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -58,7 +59,7 @@ describe('procedures should leave no trace after finishing', async () => { test('closing a transport from the server cleans up connection on the client', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -85,7 +86,7 @@ describe('procedures should leave no trace after finishing', async () => { test('rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -118,7 +119,7 @@ describe('procedures should leave no trace after finishing', async () => { test('stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -172,7 +173,7 @@ describe('procedures should leave no trace after finishing', async () => { test('subscription', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: SubscribableServiceConstructor() }; + const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -182,11 +183,11 @@ describe('procedures should leave no trace after finishing', async () => { clientTransport.eventDispatcher.numberOfListeners('message'); // start procedure - const [subscription, close] = await client.test.value.subscribe({}); + const [subscription, close] = await client.subscribable.value.subscribe({}); let result = await iterNext(subscription); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const add1 = await client.test.add.rpc({ n: 1 }); + const add1 = await client.subscribable.add.rpc({ n: 1 }); assert(add1.ok); result = await iterNext(subscription); assert(result.ok); @@ -214,7 +215,7 @@ describe('procedures should leave no trace after finishing', async () => { test('upload', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/disconnects.test.ts b/__tests__/disconnects.test.ts index b7d9c3dc..cbf289ff 100644 --- a/__tests__/disconnects.test.ts +++ b/__tests__/disconnects.test.ts @@ -27,6 +27,7 @@ import { CONNECTION_GRACE_PERIOD_MS } from '../router/client'; import { Err, UNEXPECTED_DISCONNECT } from '../router/result'; import { WebSocketServerTransport } from '../transport/impls/ws/server'; import { WebSocketClientTransport } from '../transport/impls/ws/client'; +import { buildServiceDefs } from '../router/defs'; describe('procedures should handle unexpected disconnects', async () => { const httpServer = http.createServer(); @@ -49,7 +50,7 @@ describe('procedures should handle unexpected disconnects', async () => { test('rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -82,7 +83,7 @@ describe('procedures should handle unexpected disconnects', async () => { test('stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -132,25 +133,29 @@ describe('procedures should handle unexpected disconnects', async () => { 'SERVER', ); - const serviceDefs = { test: SubscribableServiceConstructor() }; + const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client1 = createClient(client1Transport); const client2 = createClient(client2Transport); // start procedure // client1 and client2 both subscribe - const [subscription1, close1] = await client1.test.value.subscribe({}); + const [subscription1, close1] = await client1.subscribable.value.subscribe( + {}, + ); let result = await iterNext(subscription1); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const [subscription2, _close2] = await client2.test.value.subscribe({}); + const [subscription2, _close2] = await client2.subscribable.value.subscribe( + {}, + ); result = await iterNext(subscription2); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); // client2 adds a value - const add1 = await client2.test.add.rpc({ n: 1 }); + const add1 = await client2.subscribable.add.rpc({ n: 1 }); assert(add1.ok); // both clients should receive the updated value @@ -171,7 +176,7 @@ describe('procedures should handle unexpected disconnects', async () => { client2Transport.tryReconnecting = false; // client1 who is still connected can still add values and receive updates - const add2Promise = client1.test.add.rpc({ n: 2 }); + const add2Promise = client1.subscribable.add.rpc({ n: 2 }); // after we've disconnected, hit end of grace period await vi.runOnlyPendingTimersAsync(); @@ -205,7 +210,7 @@ describe('procedures should handle unexpected disconnects', async () => { test('upload', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/e2e.test.ts b/__tests__/e2e.test.ts index 1147c94a..619b9f8c 100644 --- a/__tests__/e2e.test.ts +++ b/__tests__/e2e.test.ts @@ -24,6 +24,7 @@ import { codecs } from '../codec/codec.test'; import { WebSocketClientTransport } from '../transport/impls/ws/client'; import { WebSocketServerTransport } from '../transport/impls/ws/server'; import { testFinishesCleanly } from './fixtures/cleanup'; +import { buildServiceDefs } from '../router/defs'; describe.each(codecs)( 'client <-> server integration test ($name codec)', @@ -41,7 +42,7 @@ describe.each(codecs)( test('rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); const result = await client.test.add.rpc({ n: 3 }); @@ -57,13 +58,13 @@ describe.each(codecs)( test('fallible rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: FallibleServiceConstructor() }; + const serviceDefs = buildServiceDefs([FallibleServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); - const result = await client.test.divide.rpc({ a: 10, b: 2 }); + const result = await client.fallible.divide.rpc({ a: 10, b: 2 }); assert(result.ok); expect(result.payload).toStrictEqual({ result: 5 }); - const result2 = await client.test.divide.rpc({ a: 10, b: 0 }); + const result2 = await client.fallible.divide.rpc({ a: 10, b: 0 }); assert(!result2.ok); expect(result2.payload).toStrictEqual({ code: DIV_BY_ZERO, @@ -82,10 +83,10 @@ describe.each(codecs)( test('rpc with binary (uint8array)', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: BinaryFileServiceConstructor() }; + const serviceDefs = buildServiceDefs([BinaryFileServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); - const result = await client.test.getFile.rpc({ file: 'test.py' }); + const result = await client.bin.getFile.rpc({ file: 'test.py' }); assert(result.ok); assert(result.payload.contents instanceof Uint8Array); expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual( @@ -101,7 +102,7 @@ describe.each(codecs)( test('stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -138,7 +139,7 @@ describe.each(codecs)( test('stream with init message', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -168,11 +169,11 @@ describe.each(codecs)( test('fallible stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: FallibleServiceConstructor() }; + const serviceDefs = buildServiceDefs([FallibleServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); - const [input, output, close] = await client.test.echo.stream(); + const [input, output, close] = await client.fallible.echo.stream(); input.push({ msg: 'abc', throwResult: false, throwError: false }); const result1 = await iterNext(output); assert(result1 && result1.ok); @@ -219,21 +220,23 @@ describe.each(codecs)( options, ); - const serviceDefs = { test: SubscribableServiceConstructor() }; + const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client1 = createClient(client1Transport); const client2 = createClient(client2Transport); - const [subscription1, close1] = await client1.test.value.subscribe({}); + const [subscription1, close1] = + await client1.subscribable.value.subscribe({}); let result = await iterNext(subscription1); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const [subscription2, close2] = await client2.test.value.subscribe({}); + const [subscription2, close2] = + await client2.subscribable.value.subscribe({}); result = await iterNext(subscription2); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const add1 = await client1.test.add.rpc({ n: 1 }); + const add1 = await client1.subscribable.add.rpc({ n: 1 }); assert(add1.ok); result = await iterNext(subscription1); @@ -243,7 +246,7 @@ describe.each(codecs)( assert(result.ok); expect(result.payload).toStrictEqual({ result: 1 }); - const add2 = await client2.test.add.rpc({ n: 3 }); + const add2 = await client2.subscribable.add.rpc({ n: 3 }); assert(add2.ok); result = await iterNext(subscription1); @@ -265,7 +268,7 @@ describe.each(codecs)( test('upload', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -286,7 +289,7 @@ describe.each(codecs)( test('upload with init message', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -309,7 +312,7 @@ describe.each(codecs)( test('message order is preserved in the face of disconnects', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: OrderingServiceConstructor() }; + const serviceDefs = buildServiceDefs([OrderingServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -344,7 +347,7 @@ describe.each(codecs)( const CONCURRENCY = 10; test('concurrent rpcs', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: OrderingServiceConstructor() }; + const serviceDefs = buildServiceDefs([OrderingServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -368,7 +371,7 @@ describe.each(codecs)( test('concurrent streams', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 0d524b18..bb54e8cd 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -7,6 +7,7 @@ import { Connection, Transport } from '../transport/transport'; import { NaiveJsonCodec } from '../codec/json'; import { createClient } from '../router/client'; import { Ok } from '../router/result'; +import { buildServiceDefs } from '../router/defs'; const input = Type.Union([ Type.Object({ a: Type.Number() }), @@ -41,8 +42,8 @@ const fnBody: Procedure<{}, 'rpc', typeof input, typeof output, typeof errors> = // typescript is limited to max 50 constraints // see: https://github.com/microsoft/TypeScript/issues/33541 -export const StupidlyLargeService = () => - ServiceBuilder.create('test') +export const StupidlyLargeService = (name: string) => + ServiceBuilder.create(name) .defineProcedure('f1', fnBody) .defineProcedure('f2', fnBody) .defineProcedure('f3', fnBody) @@ -112,17 +113,18 @@ export class MockTransport extends Transport { describe("ensure typescript doesn't give up trying to infer the types for large services", () => { test('service with many procedures hits typescript limit', () => { - expect(serializeService(StupidlyLargeService())).toBeTruthy(); + expect(serializeService(StupidlyLargeService('test'))).toBeTruthy(); }); test('server client should support many services with many procedures', async () => { - const listing = { - a: StupidlyLargeService(), - b: StupidlyLargeService(), - c: StupidlyLargeService(), - d: StupidlyLargeService(), - }; - const server = createServer(new MockTransport('SERVER'), listing); + const serviceDefs = buildServiceDefs([ + StupidlyLargeService('a'), + StupidlyLargeService('b'), + StupidlyLargeService('c'), + StupidlyLargeService('d'), + ]); + + const server = createServer(new MockTransport('SERVER'), serviceDefs); const client = createClient(new MockTransport('client')); expect(client.d.f48.rpc({ a: 0 })).toBeTruthy(); expect(client.a.f2.rpc({ c: 'abc' })).toBeTruthy(); diff --git a/router/builder.ts b/router/builder.ts index 9e4730e9..a0a3fc04 100644 --- a/router/builder.ts +++ b/router/builder.ts @@ -10,13 +10,13 @@ import { Result, RiverError, RiverUncaughtSchema } from './result'; * gRPC's four combinations of stream / non-stream in each direction. */ export type ValidProcType = - // Single message in both directions. + // Single message in both directions (1:1). | 'rpc' - // Client-stream (potentially preceded by an initialization message), single message from server. + // Client-stream (potentially preceded by an initialization message), single message from server (n:1). | 'upload' - // Single message from client, stream from server. + // Single message from client, stream from server (1:n). | 'subscription' - // Bidirectional stream (potentially preceded by an initialization message). + // Bidirectional stream (potentially preceded by an initialization message) (n:n). | 'stream'; /** diff --git a/router/client.ts b/router/client.ts index c1fddc62..947ad485 100644 --- a/router/client.ts +++ b/router/client.ts @@ -23,6 +23,7 @@ import { Static } from '@sinclair/typebox'; import { nanoid } from 'nanoid'; import { Err, Result, UNEXPECTED_DISCONNECT } from './result'; import { EventMap } from '../transport/events'; +import { ServiceDefs } from './defs'; // helper to make next, yield, and return all the same type type AsyncIter = AsyncGenerator; @@ -126,7 +127,7 @@ type ServiceClient = { * Defines a type that represents a client for a server with a set of services. * @template Srv - The type of the server. */ -export type ServerClient>> = { +export type ServerClient> = { [SvcName in keyof Srv['services']]: ServiceClient; }; @@ -173,7 +174,7 @@ function _createRecursiveProxy( * @param {Transport} transport - The transport to use for communication. * @returns The client for the server. */ -export const createClient = >>( +export const createClient = >( transport: Transport, serverId: TransportClientId = 'SERVER', ) => diff --git a/router/defs.ts b/router/defs.ts new file mode 100644 index 00000000..fa054b23 --- /dev/null +++ b/router/defs.ts @@ -0,0 +1,24 @@ +import { AnyService } from './builder'; + +/** + * Defines a type for a collection service definitions. Should be + * build with the {@link buildServiceDefs} function. + * @template T - An array of services. + */ +export type ServiceDefs = { + [K in T[number]['name']]: T[number]; +}; + +/** + * Builds service definitions based on an array of services. + * @param services - The array of services. + * @returns The service definitions. + */ +export function buildServiceDefs( + services: T, +): ServiceDefs { + return services.reduce((acc, service) => { + acc[service.name as keyof ServiceDefs] = service; + return acc; + }, {} as ServiceDefs); +} diff --git a/router/index.ts b/router/index.ts index 06385564..bdd1faaf 100644 --- a/router/index.ts +++ b/router/index.ts @@ -10,6 +10,8 @@ export type { Procedure, PayloadType, } from './builder'; +export { buildServiceDefs } from './defs'; +export type { ServiceDefs } from './defs'; export { createClient } from './client'; export type { ServerClient } from './client'; export { createServer } from './server'; diff --git a/router/server.ts b/router/server.ts index 49e17368..5af6f835 100644 --- a/router/server.ts +++ b/router/server.ts @@ -24,6 +24,7 @@ import { UNCAUGHT_ERROR, } from './result'; import { EventMap } from '../transport/events'; +import { ServiceDefs } from './defs'; /** * Represents a server with a set of services. Use {@link createServer} to create it. @@ -50,7 +51,7 @@ interface ProcStream { }; } -class RiverServer> { +class RiverServer { transport: Transport; services: Services; contextMap: Map>; @@ -385,7 +386,7 @@ class RiverServer> { * @param extendedContext - An optional object containing additional context to be passed to all services. * @returns A promise that resolves to a server instance with the registered services. */ -export function createServer>( +export function createServer( transport: Transport, services: Services, extendedContext?: Omit,