diff --git a/__tests__/bandwidth.bench.ts b/__tests__/bandwidth.bench.ts index 261d4935..e3094fe8 100644 --- a/__tests__/bandwidth.bench.ts +++ b/__tests__/bandwidth.bench.ts @@ -11,6 +11,7 @@ import { TestServiceConstructor } from './fixtures/services'; import { createServer } from '../router/server'; import { createClient } from '../router/client'; import { StupidlyLargeService } from './typescript-stress.test'; +import { buildServiceDefs } from '../router/defs'; let smallId = 0; let largeId = 0; @@ -77,7 +78,7 @@ describe('simple router level bandwidth', async () => { port, webSocketServer, ); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -118,12 +119,12 @@ describe('complex (50 procedures) router level bandwidth', async () => { port, webSocketServer, ); - const serviceDefs = { - a: StupidlyLargeService(), - b: StupidlyLargeService(), - c: StupidlyLargeService(), - d: StupidlyLargeService(), - }; + const serviceDefs = buildServiceDefs([ + StupidlyLargeService('a'), + StupidlyLargeService('b'), + StupidlyLargeService('c'), + StupidlyLargeService('d'), + ]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/cleanup.test.ts b/__tests__/cleanup.test.ts index 5be4490d..423df7b8 100644 --- a/__tests__/cleanup.test.ts +++ b/__tests__/cleanup.test.ts @@ -17,6 +17,7 @@ import { ensureTransportQueuesAreEventuallyEmpty, waitFor, } from './fixtures/cleanup'; +import { buildServiceDefs } from '../router/defs'; describe('procedures should leave no trace after finishing', async () => { const httpServer = http.createServer(); @@ -31,7 +32,7 @@ describe('procedures should leave no trace after finishing', async () => { test('closing a transport from the client cleans up connection on the server', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -58,7 +59,7 @@ describe('procedures should leave no trace after finishing', async () => { test('closing a transport from the server cleans up connection on the client', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -85,7 +86,7 @@ describe('procedures should leave no trace after finishing', async () => { test('rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -118,7 +119,7 @@ describe('procedures should leave no trace after finishing', async () => { test('stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -172,7 +173,7 @@ describe('procedures should leave no trace after finishing', async () => { test('subscription', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: SubscribableServiceConstructor() }; + const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -182,11 +183,11 @@ describe('procedures should leave no trace after finishing', async () => { clientTransport.eventDispatcher.numberOfListeners('message'); // start procedure - const [subscription, close] = await client.test.value.subscribe({}); + const [subscription, close] = await client.subscribable.value.subscribe({}); let result = await iterNext(subscription); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const add1 = await client.test.add.rpc({ n: 1 }); + const add1 = await client.subscribable.add.rpc({ n: 1 }); assert(add1.ok); result = await iterNext(subscription); assert(result.ok); @@ -214,7 +215,7 @@ describe('procedures should leave no trace after finishing', async () => { test('upload', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/disconnects.test.ts b/__tests__/disconnects.test.ts index b7d9c3dc..15526d56 100644 --- a/__tests__/disconnects.test.ts +++ b/__tests__/disconnects.test.ts @@ -27,6 +27,7 @@ import { CONNECTION_GRACE_PERIOD_MS } from '../router/client'; import { Err, UNEXPECTED_DISCONNECT } from '../router/result'; import { WebSocketServerTransport } from '../transport/impls/ws/server'; import { WebSocketClientTransport } from '../transport/impls/ws/client'; +import { buildServiceDefs } from '../router/defs'; describe('procedures should handle unexpected disconnects', async () => { const httpServer = http.createServer(); @@ -49,13 +50,12 @@ describe('procedures should handle unexpected disconnects', async () => { test('rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); // start procedure await client.test.add.rpc({ n: 3 }); - expect(clientTransport.connections.size).toEqual(1); expect(serverTransport.connections.size).toEqual(1); @@ -75,14 +75,14 @@ describe('procedures should handle unexpected disconnects', async () => { }), ); - expect(clientTransport.connections.size).toEqual(0); - expect(serverTransport.connections.size).toEqual(0); + waitFor(() => expect(clientTransport.connections.size).toEqual(0)); + waitFor(() => expect(serverTransport.connections.size).toEqual(0)); await ensureServerIsClean(server); }); test('stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -111,8 +111,8 @@ describe('procedures should handle unexpected disconnects', async () => { }), ); - expect(clientTransport.connections.size).toEqual(0); - expect(serverTransport.connections.size).toEqual(0); + waitFor(() => expect(clientTransport.connections.size).toEqual(0)); + waitFor(() => expect(serverTransport.connections.size).toEqual(0)); await ensureServerIsClean(server); }); @@ -132,25 +132,29 @@ describe('procedures should handle unexpected disconnects', async () => { 'SERVER', ); - const serviceDefs = { test: SubscribableServiceConstructor() }; + const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client1 = createClient(client1Transport); const client2 = createClient(client2Transport); // start procedure // client1 and client2 both subscribe - const [subscription1, close1] = await client1.test.value.subscribe({}); + const [subscription1, close1] = await client1.subscribable.value.subscribe( + {}, + ); let result = await iterNext(subscription1); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const [subscription2, _close2] = await client2.test.value.subscribe({}); + const [subscription2, _close2] = await client2.subscribable.value.subscribe( + {}, + ); result = await iterNext(subscription2); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); // client2 adds a value - const add1 = await client2.test.add.rpc({ n: 1 }); + const add1 = await client2.subscribable.add.rpc({ n: 1 }); assert(add1.ok); // both clients should receive the updated value @@ -171,7 +175,7 @@ describe('procedures should handle unexpected disconnects', async () => { client2Transport.tryReconnecting = false; // client1 who is still connected can still add values and receive updates - const add2Promise = client1.test.add.rpc({ n: 2 }); + const add2Promise = client1.subscribable.add.rpc({ n: 2 }); // after we've disconnected, hit end of grace period await vi.runOnlyPendingTimersAsync(); @@ -205,7 +209,7 @@ describe('procedures should handle unexpected disconnects', async () => { test('upload', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -233,8 +237,8 @@ describe('procedures should handle unexpected disconnects', async () => { }), ); - expect(clientTransport.connections.size).toEqual(0); - expect(serverTransport.connections.size).toEqual(0); + waitFor(() => expect(clientTransport.connections.size).toEqual(0)); + waitFor(() => expect(serverTransport.connections.size).toEqual(0)); await ensureServerIsClean(server); }); }); diff --git a/__tests__/e2e.test.ts b/__tests__/e2e.test.ts index 1147c94a..619b9f8c 100644 --- a/__tests__/e2e.test.ts +++ b/__tests__/e2e.test.ts @@ -24,6 +24,7 @@ import { codecs } from '../codec/codec.test'; import { WebSocketClientTransport } from '../transport/impls/ws/client'; import { WebSocketServerTransport } from '../transport/impls/ws/server'; import { testFinishesCleanly } from './fixtures/cleanup'; +import { buildServiceDefs } from '../router/defs'; describe.each(codecs)( 'client <-> server integration test ($name codec)', @@ -41,7 +42,7 @@ describe.each(codecs)( test('rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); const result = await client.test.add.rpc({ n: 3 }); @@ -57,13 +58,13 @@ describe.each(codecs)( test('fallible rpc', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: FallibleServiceConstructor() }; + const serviceDefs = buildServiceDefs([FallibleServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); - const result = await client.test.divide.rpc({ a: 10, b: 2 }); + const result = await client.fallible.divide.rpc({ a: 10, b: 2 }); assert(result.ok); expect(result.payload).toStrictEqual({ result: 5 }); - const result2 = await client.test.divide.rpc({ a: 10, b: 0 }); + const result2 = await client.fallible.divide.rpc({ a: 10, b: 0 }); assert(!result2.ok); expect(result2.payload).toStrictEqual({ code: DIV_BY_ZERO, @@ -82,10 +83,10 @@ describe.each(codecs)( test('rpc with binary (uint8array)', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: BinaryFileServiceConstructor() }; + const serviceDefs = buildServiceDefs([BinaryFileServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); - const result = await client.test.getFile.rpc({ file: 'test.py' }); + const result = await client.bin.getFile.rpc({ file: 'test.py' }); assert(result.ok); assert(result.payload.contents instanceof Uint8Array); expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual( @@ -101,7 +102,7 @@ describe.each(codecs)( test('stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -138,7 +139,7 @@ describe.each(codecs)( test('stream with init message', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -168,11 +169,11 @@ describe.each(codecs)( test('fallible stream', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: FallibleServiceConstructor() }; + const serviceDefs = buildServiceDefs([FallibleServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); - const [input, output, close] = await client.test.echo.stream(); + const [input, output, close] = await client.fallible.echo.stream(); input.push({ msg: 'abc', throwResult: false, throwError: false }); const result1 = await iterNext(output); assert(result1 && result1.ok); @@ -219,21 +220,23 @@ describe.each(codecs)( options, ); - const serviceDefs = { test: SubscribableServiceConstructor() }; + const serviceDefs = buildServiceDefs([SubscribableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client1 = createClient(client1Transport); const client2 = createClient(client2Transport); - const [subscription1, close1] = await client1.test.value.subscribe({}); + const [subscription1, close1] = + await client1.subscribable.value.subscribe({}); let result = await iterNext(subscription1); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const [subscription2, close2] = await client2.test.value.subscribe({}); + const [subscription2, close2] = + await client2.subscribable.value.subscribe({}); result = await iterNext(subscription2); assert(result.ok); expect(result.payload).toStrictEqual({ result: 0 }); - const add1 = await client1.test.add.rpc({ n: 1 }); + const add1 = await client1.subscribable.add.rpc({ n: 1 }); assert(add1.ok); result = await iterNext(subscription1); @@ -243,7 +246,7 @@ describe.each(codecs)( assert(result.ok); expect(result.payload).toStrictEqual({ result: 1 }); - const add2 = await client2.test.add.rpc({ n: 3 }); + const add2 = await client2.subscribable.add.rpc({ n: 3 }); assert(add2.ok); result = await iterNext(subscription1); @@ -265,7 +268,7 @@ describe.each(codecs)( test('upload', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -286,7 +289,7 @@ describe.each(codecs)( test('upload with init message', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const serviceDefs = buildServiceDefs([UploadableServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -309,7 +312,7 @@ describe.each(codecs)( test('message order is preserved in the face of disconnects', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: OrderingServiceConstructor() }; + const serviceDefs = buildServiceDefs([OrderingServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -344,7 +347,7 @@ describe.each(codecs)( const CONCURRENCY = 10; test('concurrent rpcs', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: OrderingServiceConstructor() }; + const serviceDefs = buildServiceDefs([OrderingServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); @@ -368,7 +371,7 @@ describe.each(codecs)( test('concurrent streams', async () => { const [clientTransport, serverTransport] = getTransports(); - const serviceDefs = { test: TestServiceConstructor() }; + const serviceDefs = buildServiceDefs([TestServiceConstructor()]); const server = createServer(serverTransport, serviceDefs); const client = createClient(clientTransport); diff --git a/__tests__/fixtures/services.ts b/__tests__/fixtures/services.ts index ef17b09e..a2f346da 100644 --- a/__tests__/fixtures/services.ts +++ b/__tests__/fixtures/services.ts @@ -1,6 +1,5 @@ import { Type } from '@sinclair/typebox'; import { ServiceBuilder } from '../../router/builder'; -import { reply } from '../../transport/message'; import { Err, Ok } from '../../router/result'; import { Observable } from './observable'; @@ -21,10 +20,9 @@ export const TestServiceConstructor = () => input: Type.Object({ n: Type.Number() }), output: Type.Object({ result: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg) { - const { n } = msg.payload; + async handler(ctx, { n }) { ctx.state.count += n; - return reply(msg, Ok({ result: ctx.state.count })); + return Ok({ result: ctx.state.count }); }, }) .defineProcedure('echo', { @@ -33,13 +31,12 @@ export const TestServiceConstructor = () => output: EchoResponse, errors: Type.Never(), async handler(_ctx, msgStream, returnStream) { - for await (const msg of msgStream) { - const req = msg.payload; - if (!req.ignore) { - returnStream.push(reply(msg, Ok({ response: req.msg }))); + for await (const { ignore, msg, end } of msgStream) { + if (!ignore) { + returnStream.push(Ok({ response: msg })); } - if (req.end) { + if (end) { returnStream.end(); } } @@ -52,12 +49,9 @@ export const TestServiceConstructor = () => output: EchoResponse, errors: Type.Never(), async handler(_ctx, init, msgStream, returnStream) { - for await (const msg of msgStream) { - const req = msg.payload; - if (!req.ignore) { - returnStream.push( - reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })), - ); + for await (const { ignore, msg } of msgStream) { + if (!ignore) { + returnStream.push(Ok({ response: `${init.prefix} ${msg}` })); } } }, @@ -74,10 +68,9 @@ export const OrderingServiceConstructor = () => input: Type.Object({ n: Type.Number() }), output: Type.Object({ n: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg) { - const { n } = msg.payload; + async handler(ctx, { n }) { ctx.state.msgs.push(n); - return reply(msg, Ok({ n })); + return Ok({ n }); }, }) .defineProcedure('getAll', { @@ -85,8 +78,8 @@ export const OrderingServiceConstructor = () => input: Type.Object({}), output: Type.Object({ msgs: Type.Array(Type.Number()) }), errors: Type.Never(), - async handler(ctx, msg) { - return reply(msg, Ok({ msgs: ctx.state.msgs })); + async handler(ctx, _msg) { + return Ok({ msgs: ctx.state.msgs }); }, }) .finalize(); @@ -98,11 +91,11 @@ export const BinaryFileServiceConstructor = () => input: Type.Object({ file: Type.String() }), output: Type.Object({ contents: Type.Uint8Array() }), errors: Type.Never(), - async handler(_ctx, msg) { + async handler(_ctx, { file }) { const bytes: Uint8Array = new TextEncoder().encode( - `contents for file ${msg.payload.file}`, + `contents for file ${file}`, ); - return reply(msg, Ok({ contents: bytes })); + return Ok({ contents: bytes }); }, }) .finalize(); @@ -123,19 +116,15 @@ export const FallibleServiceConstructor = () => extras: Type.Object({ test: Type.String() }), }), ]), - async handler(_ctx, msg) { - const { a, b } = msg.payload; + async handler(_ctx, { a, b }) { if (b === 0) { - return reply(msg, { - ok: false, - payload: { - code: DIV_BY_ZERO, - message: 'Cannot divide by zero', - extras: { test: 'abc' }, - }, + return Err({ + code: DIV_BY_ZERO, + message: 'Cannot divide by zero', + extras: { test: 'abc' }, }); } else { - return reply(msg, Ok({ result: a / b })); + return Ok({ result: a / b }); } }, }) @@ -154,22 +143,18 @@ export const FallibleServiceConstructor = () => }), ]), async handler(_ctx, msgStream, returnStream) { - for await (const msg of msgStream) { - const req = msg.payload; - if (req.throwError) { + for await (const { msg, throwError, throwResult } of msgStream) { + if (throwError) { throw new Error('some message'); - } else if (req.throwResult) { + } else if (throwResult) { returnStream.push( - reply( - msg, - Err({ - code: STREAM_ERROR, - message: 'field throwResult was set to true', - }), - ), + Err({ + code: STREAM_ERROR, + message: 'field throwResult was set to true', + }), ); } else { - returnStream.push(reply(msg, Ok({ response: req.msg }))); + returnStream.push(Ok({ response: msg })); } } }, @@ -186,10 +171,9 @@ export const SubscribableServiceConstructor = () => input: Type.Object({ n: Type.Number() }), output: Type.Object({ result: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg) { - const { n } = msg.payload; + async handler(ctx, { n }) { ctx.state.count.set((prev) => prev + n); - return reply(msg, Ok({ result: ctx.state.count.get() })); + return Ok({ result: ctx.state.count.get() }); }, }) .defineProcedure('value', { @@ -197,9 +181,9 @@ export const SubscribableServiceConstructor = () => input: Type.Object({}), output: Type.Object({ result: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg, returnStream) { + async handler(ctx, _msg, returnStream) { ctx.state.count.observe((count) => { - returnStream.push(reply(msg, Ok({ result: count }))); + returnStream.push(Ok({ result: count })); }); }, }) @@ -215,13 +199,11 @@ export const UploadableServiceConstructor = () => errors: Type.Never(), async handler(_ctx, msgStream) { let result = 0; - let lastMsg; - for await (const msg of msgStream) { - const { n } = msg.payload; + for await (const { n } of msgStream) { result += n; - lastMsg = msg; } - return reply(lastMsg!, Ok({ result: result })); + + return Ok({ result: result }); }, }) .defineProcedure('addMultipleWithPrefix', { @@ -232,16 +214,10 @@ export const UploadableServiceConstructor = () => errors: Type.Never(), async handler(_ctx, init, msgStream) { let result = 0; - let lastMsg; - for await (const msg of msgStream) { - const { n } = msg.payload; + for await (const { n } of msgStream) { result += n; - lastMsg = msg; } - return reply( - lastMsg!, - Ok({ result: init.payload.prefix + ' ' + result }), - ); + return Ok({ result: init.prefix + ' ' + result }); }, }) .finalize(); diff --git a/__tests__/handler.test.ts b/__tests__/handler.test.ts index 175ea384..52148af7 100644 --- a/__tests__/handler.test.ts +++ b/__tests__/handler.test.ts @@ -1,10 +1,8 @@ import { asClientRpc, asClientStream, - asClientStreamWithInitialization, asClientSubscription, asClientUpload, - asClientUploadWithInitialization, iterNext, } from '../util/testHelpers'; import { assert, describe, expect, test } from 'vitest'; @@ -21,10 +19,9 @@ import { Observable } from './fixtures/observable'; describe('server-side test', () => { const service = TestServiceConstructor(); - const initialState = { count: 0 }; test('rpc basic', async () => { - const add = asClientRpc(initialState, service.procedures.add); + const add = asClientRpc({ count: 0 }, service.procedures.add); const result = await add({ n: 3 }); assert(result.ok); expect(result.payload).toStrictEqual({ result: 3 }); @@ -57,7 +54,7 @@ describe('server-side test', () => { test('stream basic', async () => { const [input, output] = asClientStream( - initialState, + { count: 0 }, service.procedures.echo, ); @@ -78,8 +75,8 @@ describe('server-side test', () => { }); test('stream with initialization', async () => { - const [input, output] = asClientStreamWithInitialization( - initialState, + const [input, output] = asClientStream( + { count: 0 }, service.procedures.echoWithPrefix, { prefix: 'test' }, ); @@ -148,7 +145,10 @@ describe('server-side test', () => { test('uploads', async () => { const service = UploadableServiceConstructor(); - const [input, result] = asClientUpload({}, service.procedures.addMultiple); + const [input, result] = await asClientUpload( + {}, + service.procedures.addMultiple, + ); input.push({ n: 1 }); input.push({ n: 2 }); @@ -158,7 +158,7 @@ describe('server-side test', () => { test('uploads with initialization', async () => { const service = UploadableServiceConstructor(); - const [input, result] = asClientUploadWithInitialization( + const [input, result] = await asClientUpload( {}, service.procedures.addMultipleWithPrefix, { prefix: 'test' }, diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 0d524b18..c2ba8947 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -1,12 +1,13 @@ import { describe, expect, test } from 'vitest'; import { Procedure, ServiceBuilder, serializeService } from '../router/builder'; import { Type } from '@sinclair/typebox'; -import { MessageId, OpaqueTransportMessage, reply } from '../transport/message'; +import { MessageId, OpaqueTransportMessage } from '../transport/message'; import { createServer } from '../router/server'; import { Connection, Transport } from '../transport/transport'; import { NaiveJsonCodec } from '../codec/json'; import { createClient } from '../router/client'; import { Ok } from '../router/result'; +import { buildServiceDefs } from '../router/defs'; const input = Type.Union([ Type.Object({ a: Type.Number() }), @@ -31,18 +32,18 @@ const fnBody: Procedure<{}, 'rpc', typeof input, typeof output, typeof errors> = output, errors, async handler(_state, msg) { - if ('c' in msg.payload) { - return reply(msg, Ok({ b: msg.payload.c })); + if ('c' in msg) { + return Ok({ b: msg.c }); } else { - return reply(msg, Ok({ b: msg.payload.a })); + return Ok({ b: msg.a }); } }, }; // typescript is limited to max 50 constraints // see: https://github.com/microsoft/TypeScript/issues/33541 -export const StupidlyLargeService = () => - ServiceBuilder.create('test') +export const StupidlyLargeService = (name: string) => + ServiceBuilder.create(name) .defineProcedure('f1', fnBody) .defineProcedure('f2', fnBody) .defineProcedure('f3', fnBody) @@ -112,17 +113,18 @@ export class MockTransport extends Transport { describe("ensure typescript doesn't give up trying to infer the types for large services", () => { test('service with many procedures hits typescript limit', () => { - expect(serializeService(StupidlyLargeService())).toBeTruthy(); + expect(serializeService(StupidlyLargeService('test'))).toBeTruthy(); }); test('server client should support many services with many procedures', async () => { - const listing = { - a: StupidlyLargeService(), - b: StupidlyLargeService(), - c: StupidlyLargeService(), - d: StupidlyLargeService(), - }; - const server = createServer(new MockTransport('SERVER'), listing); + const serviceDefs = buildServiceDefs([ + StupidlyLargeService('a'), + StupidlyLargeService('b'), + StupidlyLargeService('c'), + StupidlyLargeService('d'), + ]); + + const server = createServer(new MockTransport('SERVER'), serviceDefs); const client = createClient(new MockTransport('client')); expect(client.d.f48.rpc({ a: 0 })).toBeTruthy(); expect(client.a.f2.rpc({ c: 'abc' })).toBeTruthy(); diff --git a/router/builder.ts b/router/builder.ts index 9e4730e9..c5f4cd35 100644 --- a/router/builder.ts +++ b/router/builder.ts @@ -1,6 +1,5 @@ import { TObject, Static, Type, TUnion } from '@sinclair/typebox'; import type { Pushable } from 'it-pushable'; -import { TransportMessage } from '../transport/message'; import { ServiceContextWithState } from './context'; import { Result, RiverError, RiverUncaughtSchema } from './result'; @@ -10,13 +9,13 @@ import { Result, RiverError, RiverUncaughtSchema } from './result'; * gRPC's four combinations of stream / non-stream in each direction. */ export type ValidProcType = - // Single message in both directions. + // Single message in both directions (1:1). | 'rpc' - // Client-stream (potentially preceded by an initialization message), single message from server. + // Client-stream (potentially preceded by an initialization message), single message from server (n:1). | 'upload' - // Single message from client, stream from server. + // Single message from client, stream from server (1:n). | 'subscription' - // Bidirectional stream (potentially preceded by an initialization message). + // Bidirectional stream (potentially preceded by an initialization message) (n:n). | 'stream'; /** @@ -176,8 +175,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: TransportMessage>, - ) => Promise, Static>>>; + input: Static, + ) => Promise, Static>>; type: Ty; } : never @@ -190,9 +189,9 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - init: TransportMessage>, - input: AsyncIterable>>, - ) => Promise, Static>>>; + init: Static, + input: AsyncIterableIterator>, + ) => Promise, Static>>; type: Ty; } : { @@ -201,8 +200,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: AsyncIterable>>, - ) => Promise, Static>>>; + input: AsyncIterableIterator>, + ) => Promise, Static>>; type: Ty; } : Ty extends 'subscription' @@ -213,8 +212,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: TransportMessage>, - output: Pushable, Static>>>, + input: Static, + output: Pushable, Static>>, ) => Promise; type: Ty; } @@ -228,9 +227,9 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - init: TransportMessage>, - input: AsyncIterable>>, - output: Pushable, Static>>>, + init: Static, + input: AsyncIterableIterator>, + output: Pushable, Static>>, ) => Promise; type: Ty; } @@ -240,8 +239,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: AsyncIterable>>, - output: Pushable, Static>>>, + input: AsyncIterableIterator>, + output: Pushable, Static>>, ) => Promise; type: Ty; } diff --git a/router/client.ts b/router/client.ts index c1fddc62..e62265b9 100644 --- a/router/client.ts +++ b/router/client.ts @@ -23,9 +23,10 @@ import { Static } from '@sinclair/typebox'; import { nanoid } from 'nanoid'; import { Err, Result, UNEXPECTED_DISCONNECT } from './result'; import { EventMap } from '../transport/events'; +import { ServiceDefs } from './defs'; // helper to make next, yield, and return all the same type -type AsyncIter = AsyncGenerator; +export type AsyncIter = AsyncGenerator; /** * A helper type to transform an actual service type into a type @@ -126,7 +127,7 @@ type ServiceClient = { * Defines a type that represents a client for a server with a set of services. * @template Srv - The type of the server. */ -export type ServerClient>> = { +export type ServerClient> = { [SvcName in keyof Srv['services']]: ServiceClient; }; @@ -173,7 +174,7 @@ function _createRecursiveProxy( * @param {Transport} transport - The transport to use for communication. * @returns The client for the server. */ -export const createClient = >>( +export const createClient = >( transport: Transport, serverId: TransportClientId = 'SERVER', ) => diff --git a/router/defs.ts b/router/defs.ts new file mode 100644 index 00000000..fa054b23 --- /dev/null +++ b/router/defs.ts @@ -0,0 +1,24 @@ +import { AnyService } from './builder'; + +/** + * Defines a type for a collection service definitions. Should be + * build with the {@link buildServiceDefs} function. + * @template T - An array of services. + */ +export type ServiceDefs = { + [K in T[number]['name']]: T[number]; +}; + +/** + * Builds service definitions based on an array of services. + * @param services - The array of services. + * @returns The service definitions. + */ +export function buildServiceDefs( + services: T, +): ServiceDefs { + return services.reduce((acc, service) => { + acc[service.name as keyof ServiceDefs] = service; + return acc; + }, {} as ServiceDefs); +} diff --git a/router/index.ts b/router/index.ts index 06385564..bdd1faaf 100644 --- a/router/index.ts +++ b/router/index.ts @@ -10,6 +10,8 @@ export type { Procedure, PayloadType, } from './builder'; +export { buildServiceDefs } from './defs'; +export type { ServiceDefs } from './defs'; export { createClient } from './client'; export type { ServerClient } from './client'; export { createServer } from './server'; diff --git a/router/server.ts b/router/server.ts index 49e17368..c3a98b31 100644 --- a/router/server.ts +++ b/router/server.ts @@ -6,7 +6,6 @@ import type { Pushable } from 'it-pushable'; import { ControlMessagePayloadSchema, OpaqueTransportMessage, - TransportMessage, isStreamClose, isStreamOpen, reply, @@ -24,6 +23,7 @@ import { UNCAUGHT_ERROR, } from './result'; import { EventMap } from '../transport/events'; +import { ServiceDefs } from './defs'; /** * Represents a server with a set of services. Use {@link createServer} to create it. @@ -40,17 +40,15 @@ interface ProcStream { serviceName: string; procedureName: string; procedure: AnyProcedure; - incoming: Pushable; - outgoing: Pushable< - TransportMessage, Static>> - >; + incoming: Pushable; + outgoing: Pushable, Static>>; promises: { outputHandler: Promise; inputHandler: Promise; }; } -class RiverServer> { +class RiverServer { transport: Transport; services: Services; contextMap: Map>; @@ -168,7 +166,7 @@ class RiverServer> { // sending outgoing messages back to client (async () => { for await (const response of outgoing) { - this.transport.send(response); + this.transport.send(reply(message, response)); } // we ended, send a close bit back to the client @@ -192,13 +190,10 @@ class RiverServer> { `${this.transport.clientId} -- procedure ${message.serviceName}.${message.procedureName}:${message.streamId} threw an error: ${errorMsg}`, ); outgoing.push( - reply( - message, - Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - } satisfies Static), - ), + Err({ + code: UNCAUGHT_ERROR, + message: errorMsg, + } satisfies Static), ); }; @@ -330,7 +325,7 @@ class RiverServer> { Value.Check(procedure.init, message.payload)) || Value.Check(procedure.input, message.payload) ) { - procStream.incoming.push(message as TransportMessage); + procStream.incoming.push(message.payload as PayloadType); } else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { log?.error( `${this.transport.clientId} -- procedure ${procStream.serviceName}.${ @@ -385,7 +380,7 @@ class RiverServer> { * @param extendedContext - An optional object containing additional context to be passed to all services. * @returns A promise that resolves to a server instance with the registered services. */ -export function createServer>( +export function createServer( transport: Transport, services: Services, extendedContext?: Omit, diff --git a/util/testHelpers.ts b/util/testHelpers.ts index 9c72050d..6e68a80f 100644 --- a/util/testHelpers.ts +++ b/util/testHelpers.ts @@ -2,8 +2,6 @@ import WebSocket from 'isomorphic-ws'; import { WebSocketServer } from 'ws'; import http from 'http'; import { WebSocketClientTransport } from '../transport/impls/ws/client'; -import { Static } from '@sinclair/typebox'; -import { Procedure, ServiceContext } from '../router'; import { Connection, OpaqueTransportMessage, @@ -11,19 +9,20 @@ import { TransportClientId, TransportMessage, msg, - reply, } from '../transport'; -import { Pushable, pushable } from 'it-pushable'; +import { pushable } from 'it-pushable'; +import { Codec } from '../codec'; +import { WebSocketServerTransport } from '../transport/impls/ws/server'; import { - Err, + PayloadType, + Procedure, Result, RiverError, RiverUncaughtSchema, + ServiceContext, UNCAUGHT_ERROR, -} from '../router/result'; -import { Codec } from '../codec'; -import { WebSocketServerTransport } from '../transport/impls/ws/server'; -import { PayloadType } from '../router/builder'; +} from '../router'; +import { Static } from '@sinclair/typebox'; /** * Creates a WebSocket server instance using the provided HTTP server. @@ -91,451 +90,186 @@ export function createWsTransports( } /** - * Transforms an RPC procedure definition into a normal function call. + * Converts a payload object to a transport message with reasonable defaults. * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input message payload. - * @template O - The type of the output message payload. - * @param {State} state - The state object. - * @param {Procedure} proc - The RPC procedure to invoke. - * @param {Omit} [extendedContext] - Optional extended context. - * @returns A function that can be used to invoke the RPC procedure. + * @param payload - The payload object to be converted. + * @param streamId - The optional stream ID. + * @returns The transport message. */ -export function asClientRpc< - State extends object | unknown, - I extends PayloadType, - O extends PayloadType, - E extends RiverError, ->( - state: State, - proc: Procedure, - extendedContext?: Omit, -) { - return ( - msg: Static, - ): Promise< - Result, Static | Static> - > => - proc - .handler({ ...extendedContext, state }, payloadToTransportMessage(msg)) - .then((res) => res.payload) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }); +export function payloadToTransportMessage( + payload: Payload, + streamId?: string, + from: TransportClientId = 'client', + to: TransportClientId = 'SERVER', +): TransportMessage { + return msg(from, to, streamId ?? 'stream', payload, 'service', 'procedure'); } /** - * Transforms a stream procedure definition into a pair of input and output streams. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns Pair of input and output streams. + * Creates a dummy opaque transport message for testing purposes. + * @returns The created opaque transport message. */ -export function asClientStream< - State extends object | unknown, - I extends PayloadType, - O extends PayloadType, - E extends RiverError, ->( - state: State, - proc: Procedure, - extendedContext?: Omit, -): [ - Pushable>, - Pushable, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const rawOutput = pushable, Static>>({ - objectMode: true, - }); - - const transportInput = pushable>>({ - objectMode: true, - }); - const transportOutput = pushable< - TransportMessage, Static>> - >({ - objectMode: true, +export function createDummyTransportMessage(): OpaqueTransportMessage { + return payloadToTransportMessage({ + msg: 'cool', + test: Math.random(), }); - - // wrapping in transport - (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); - } - transportInput.end(); - })(); - - // unwrap from transport - (async () => { - for await (const transportRes of transportOutput) { - rawOutput.push(transportRes.payload); - } - rawOutput.end(); - })(); - - // handle - (async () => { - try { - await proc.handler( - { ...extendedContext, state }, - transportInput, - transportOutput, - ); - } catch (err) { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - transportOutput.push( - reply( - payloadToTransportMessage({}), - Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }), - ), - ); - } - transportOutput.end(); - })(); - - return [rawInput, rawOutput]; } /** - * Transforms a stream procedure definition into a pair of input and output streams. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns Pair of input and output streams. + * Retrieves the next value from an async iterable iterator. + * @param iter The async iterable iterator. + * @returns A promise that resolves to the next value from the iterator. */ -export function asClientStreamWithInitialization< - State extends object | unknown, - I extends PayloadType, - O extends PayloadType, - E extends RiverError, - Init extends PayloadType, ->( - state: State, - proc: Procedure, - init: Static, - extendedContext?: Omit, -): [ - Pushable>, - Pushable, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const rawOutput = pushable, Static>>({ - objectMode: true, - }); - - const transportInput = pushable>>({ - objectMode: true, - }); - const transportOutput = pushable< - TransportMessage, Static>> - >({ - objectMode: true, - }); +export async function iterNext(iter: AsyncIterableIterator) { + return await iter.next().then((res) => res.value); +} - // wrapping in transport - (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); +/** + * Waits for a message on the transport. + * @param {Transport} t - The transport to listen to. + * @param filter - An optional filter function to apply to the received messages. + * @returns A promise that resolves with the payload of the first message that passes the filter. + */ +export async function waitForMessage( + t: Transport, + filter?: (msg: OpaqueTransportMessage) => boolean, + rejectMismatch?: boolean, +) { + return new Promise((resolve, reject) => { + function cleanup() { + t.removeEventListener('message', onMessage); } - transportInput.end(); - })(); - // unwrap from transport - (async () => { - for await (const transportRes of transportOutput) { - rawOutput.push(transportRes.payload); + function onMessage(msg: OpaqueTransportMessage) { + if (!filter || filter?.(msg)) { + cleanup(); + resolve(msg.payload); + } else if (rejectMismatch) { + reject(new Error('message didnt match the filter')); + } } - rawOutput.end(); - })(); - // handle - (async () => { - try { - await proc.handler( - { ...extendedContext, state }, - payloadToTransportMessage(init), - transportInput, - transportOutput, - ); - } catch (err) { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - transportOutput.push( - reply( - payloadToTransportMessage({}), - Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }), - ), - ); - } - transportOutput.end(); - })(); + t.addEventListener('message', onMessage); + }); +} - return [rawInput, rawOutput]; +function catchProcError(err: unknown) { + const errorMsg = + err instanceof Error ? err.message : `[coerced to error] ${err}`; + return { + ok: false, + payload: { + code: UNCAUGHT_ERROR, + message: errorMsg, + }, + }; } -/** - * Transforms a subscription procedure definition into a procedure that returns an output stream. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns A function that when passed a message, returns the output stream. - */ -export function asClientSubscription< +export function asClientRpc< State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, + Init extends PayloadType | null = null, >( state: State, - proc: Procedure, + proc: Procedure, extendedContext?: Omit, ) { - const rawOutput = pushable, Static>>({ - objectMode: true, - }); - const transportOutput = pushable< - TransportMessage, Static>> - >({ - objectMode: true, - }); - - // unwrap from transport - (async () => { - for await (const transportRes of transportOutput) { - rawOutput.push(transportRes.payload); - } - rawOutput.end(); - })(); - return async ( msg: Static, ): Promise< - Pushable, Static | Static>> + Result, Static | Static> > => { - proc - .handler( - { ...extendedContext, state }, - payloadToTransportMessage(msg), - transportOutput, - ) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }); - - return rawOutput; + return await proc + .handler({ ...extendedContext, state }, msg) + .catch(catchProcError); }; } -/** - * Transforms an upload procedure definition into a procedure that returns an input stream. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns A function that when passed a message, returns the output stream. - */ -export function asClientUpload< +export function asClientStream< State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, + Init extends PayloadType | null = null, >( state: State, - proc: Procedure, + proc: Procedure, + init?: Init extends PayloadType ? Static : null, extendedContext?: Omit, -): [ - Pushable>, - Promise, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const transportInput = pushable>>({ +) { + const input = pushable>({ objectMode: true }); + const output = pushable, Static>>({ objectMode: true, }); - // wrapping in transport (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); + if (init) { + const _proc = proc as Procedure; + await _proc + .handler({ ...extendedContext, state }, init, input, output) + .catch((err) => output.push(catchProcError(err))); + } else { + const _proc = proc as Procedure; + await _proc + .handler({ ...extendedContext, state }, input, output) + .catch((err) => output.push(catchProcError(err))); } - transportInput.end(); })(); - return [ - rawInput, - proc - .handler({ ...extendedContext, state }, transportInput) - .then((res) => res.payload) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }), - ]; + return [input, output] as const; } -/** - * Transforms an upload with initialization procedure definition into a procedure that returns an - * input stream. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template Init - The type of the init object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns A function that when passed a message, returns the output stream. - */ -export function asClientUploadWithInitialization< +export function asClientSubscription< State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, - Init extends PayloadType, >( state: State, - proc: Procedure, - init: Static, + proc: Procedure, extendedContext?: Omit, -): [ - Pushable>, - Promise, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const transportInput = pushable>>({ +) { + const output = pushable, Static>>({ objectMode: true, }); - // wrapping in transport - (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); - } - transportInput.end(); - })(); - - return [ - rawInput, - proc - .handler( - { ...extendedContext, state }, - payloadToTransportMessage(init), - transportInput, - ) - .then((res) => res.payload) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }), - ]; -} - -/** - * Converts a payload object to a transport message with reasonable defaults. - * This should only be used for testing. - * @param payload - The payload object to be converted. - * @param streamId - The optional stream ID. - * @returns The transport message. - */ -export function payloadToTransportMessage( - payload: Payload, - streamId?: string, - from: TransportClientId = 'client', - to: TransportClientId = 'SERVER', -): TransportMessage { - return msg(from, to, streamId ?? 'stream', payload, 'service', 'procedure'); -} - -/** - * Creates a dummy opaque transport message for testing purposes. - * @returns The created opaque transport message. - */ -export function createDummyTransportMessage(): OpaqueTransportMessage { - return payloadToTransportMessage({ - msg: 'cool', - test: Math.random(), - }); -} - -/** - * Retrieves the next value from an async iterable iterator. - * @param iter The async iterable iterator. - * @returns A promise that resolves to the next value from the iterator. - */ -export function iterNext(iter: AsyncIterableIterator) { - return iter.next().then((res) => res.value); + return async (msg: Static) => { + (async () => { + return await proc + .handler({ ...extendedContext, state }, msg, output) + .catch((err) => output.push(catchProcError(err))); + })(); + return output; + }; } -/** - * Waits for a message on the transport. - * @param {Transport} t - The transport to listen to. - * @param filter - An optional filter function to apply to the received messages. - * @returns A promise that resolves with the payload of the first message that passes the filter. - */ -export async function waitForMessage( - t: Transport, - filter?: (msg: OpaqueTransportMessage) => boolean, - rejectMismatch?: boolean, +export async function asClientUpload< + State extends object | unknown, + I extends PayloadType, + O extends PayloadType, + E extends RiverError, + Init extends PayloadType | null = null, +>( + state: State, + proc: Procedure, + init?: Init extends PayloadType ? Static : null, + extendedContext?: Omit, ) { - return new Promise((resolve, reject) => { - function cleanup() { - t.removeEventListener('message', onMessage); - } - - function onMessage(msg: OpaqueTransportMessage) { - if (!filter || filter?.(msg)) { - cleanup(); - resolve(msg.payload); - } else if (rejectMismatch) { - reject(new Error('message didnt match the filter')); - } - } - - t.addEventListener('message', onMessage); - }); + const input = pushable>({ objectMode: true }); + if (init) { + const _proc = proc as Procedure; + const result = _proc + .handler({ ...extendedContext, state }, init, input) + .catch(catchProcError); + return [input, result] as const; + } else { + const _proc = proc as Procedure; + const result = _proc + .handler({ ...extendedContext, state }, input) + .catch(catchProcError); + return [input, result] as const; + } }