diff --git a/__tests__/disconnects.test.ts b/__tests__/disconnects.test.ts index cbf289ff..15526d56 100644 --- a/__tests__/disconnects.test.ts +++ b/__tests__/disconnects.test.ts @@ -56,7 +56,6 @@ describe('procedures should handle unexpected disconnects', async () => { // start procedure await client.test.add.rpc({ n: 3 }); - expect(clientTransport.connections.size).toEqual(1); expect(serverTransport.connections.size).toEqual(1); @@ -76,8 +75,8 @@ describe('procedures should handle unexpected disconnects', async () => { }), ); - expect(clientTransport.connections.size).toEqual(0); - expect(serverTransport.connections.size).toEqual(0); + waitFor(() => expect(clientTransport.connections.size).toEqual(0)); + waitFor(() => expect(serverTransport.connections.size).toEqual(0)); await ensureServerIsClean(server); }); @@ -112,8 +111,8 @@ describe('procedures should handle unexpected disconnects', async () => { }), ); - expect(clientTransport.connections.size).toEqual(0); - expect(serverTransport.connections.size).toEqual(0); + waitFor(() => expect(clientTransport.connections.size).toEqual(0)); + waitFor(() => expect(serverTransport.connections.size).toEqual(0)); await ensureServerIsClean(server); }); @@ -238,8 +237,8 @@ describe('procedures should handle unexpected disconnects', async () => { }), ); - expect(clientTransport.connections.size).toEqual(0); - expect(serverTransport.connections.size).toEqual(0); + waitFor(() => expect(clientTransport.connections.size).toEqual(0)); + waitFor(() => expect(serverTransport.connections.size).toEqual(0)); await ensureServerIsClean(server); }); }); diff --git a/__tests__/fixtures/services.ts b/__tests__/fixtures/services.ts index ef17b09e..a2f346da 100644 --- a/__tests__/fixtures/services.ts +++ b/__tests__/fixtures/services.ts @@ -1,6 +1,5 @@ import { Type } from '@sinclair/typebox'; import { ServiceBuilder } from '../../router/builder'; -import { reply } from '../../transport/message'; import { Err, Ok } from '../../router/result'; import { Observable } from './observable'; @@ -21,10 +20,9 @@ export const TestServiceConstructor = () => input: Type.Object({ n: Type.Number() }), output: Type.Object({ result: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg) { - const { n } = msg.payload; + async handler(ctx, { n }) { ctx.state.count += n; - return reply(msg, Ok({ result: ctx.state.count })); + return Ok({ result: ctx.state.count }); }, }) .defineProcedure('echo', { @@ -33,13 +31,12 @@ export const TestServiceConstructor = () => output: EchoResponse, errors: Type.Never(), async handler(_ctx, msgStream, returnStream) { - for await (const msg of msgStream) { - const req = msg.payload; - if (!req.ignore) { - returnStream.push(reply(msg, Ok({ response: req.msg }))); + for await (const { ignore, msg, end } of msgStream) { + if (!ignore) { + returnStream.push(Ok({ response: msg })); } - if (req.end) { + if (end) { returnStream.end(); } } @@ -52,12 +49,9 @@ export const TestServiceConstructor = () => output: EchoResponse, errors: Type.Never(), async handler(_ctx, init, msgStream, returnStream) { - for await (const msg of msgStream) { - const req = msg.payload; - if (!req.ignore) { - returnStream.push( - reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })), - ); + for await (const { ignore, msg } of msgStream) { + if (!ignore) { + returnStream.push(Ok({ response: `${init.prefix} ${msg}` })); } } }, @@ -74,10 +68,9 @@ export const OrderingServiceConstructor = () => input: Type.Object({ n: Type.Number() }), output: Type.Object({ n: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg) { - const { n } = msg.payload; + async handler(ctx, { n }) { ctx.state.msgs.push(n); - return reply(msg, Ok({ n })); + return Ok({ n }); }, }) .defineProcedure('getAll', { @@ -85,8 +78,8 @@ export const OrderingServiceConstructor = () => input: Type.Object({}), output: Type.Object({ msgs: Type.Array(Type.Number()) }), errors: Type.Never(), - async handler(ctx, msg) { - return reply(msg, Ok({ msgs: ctx.state.msgs })); + async handler(ctx, _msg) { + return Ok({ msgs: ctx.state.msgs }); }, }) .finalize(); @@ -98,11 +91,11 @@ export const BinaryFileServiceConstructor = () => input: Type.Object({ file: Type.String() }), output: Type.Object({ contents: Type.Uint8Array() }), errors: Type.Never(), - async handler(_ctx, msg) { + async handler(_ctx, { file }) { const bytes: Uint8Array = new TextEncoder().encode( - `contents for file ${msg.payload.file}`, + `contents for file ${file}`, ); - return reply(msg, Ok({ contents: bytes })); + return Ok({ contents: bytes }); }, }) .finalize(); @@ -123,19 +116,15 @@ export const FallibleServiceConstructor = () => extras: Type.Object({ test: Type.String() }), }), ]), - async handler(_ctx, msg) { - const { a, b } = msg.payload; + async handler(_ctx, { a, b }) { if (b === 0) { - return reply(msg, { - ok: false, - payload: { - code: DIV_BY_ZERO, - message: 'Cannot divide by zero', - extras: { test: 'abc' }, - }, + return Err({ + code: DIV_BY_ZERO, + message: 'Cannot divide by zero', + extras: { test: 'abc' }, }); } else { - return reply(msg, Ok({ result: a / b })); + return Ok({ result: a / b }); } }, }) @@ -154,22 +143,18 @@ export const FallibleServiceConstructor = () => }), ]), async handler(_ctx, msgStream, returnStream) { - for await (const msg of msgStream) { - const req = msg.payload; - if (req.throwError) { + for await (const { msg, throwError, throwResult } of msgStream) { + if (throwError) { throw new Error('some message'); - } else if (req.throwResult) { + } else if (throwResult) { returnStream.push( - reply( - msg, - Err({ - code: STREAM_ERROR, - message: 'field throwResult was set to true', - }), - ), + Err({ + code: STREAM_ERROR, + message: 'field throwResult was set to true', + }), ); } else { - returnStream.push(reply(msg, Ok({ response: req.msg }))); + returnStream.push(Ok({ response: msg })); } } }, @@ -186,10 +171,9 @@ export const SubscribableServiceConstructor = () => input: Type.Object({ n: Type.Number() }), output: Type.Object({ result: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg) { - const { n } = msg.payload; + async handler(ctx, { n }) { ctx.state.count.set((prev) => prev + n); - return reply(msg, Ok({ result: ctx.state.count.get() })); + return Ok({ result: ctx.state.count.get() }); }, }) .defineProcedure('value', { @@ -197,9 +181,9 @@ export const SubscribableServiceConstructor = () => input: Type.Object({}), output: Type.Object({ result: Type.Number() }), errors: Type.Never(), - async handler(ctx, msg, returnStream) { + async handler(ctx, _msg, returnStream) { ctx.state.count.observe((count) => { - returnStream.push(reply(msg, Ok({ result: count }))); + returnStream.push(Ok({ result: count })); }); }, }) @@ -215,13 +199,11 @@ export const UploadableServiceConstructor = () => errors: Type.Never(), async handler(_ctx, msgStream) { let result = 0; - let lastMsg; - for await (const msg of msgStream) { - const { n } = msg.payload; + for await (const { n } of msgStream) { result += n; - lastMsg = msg; } - return reply(lastMsg!, Ok({ result: result })); + + return Ok({ result: result }); }, }) .defineProcedure('addMultipleWithPrefix', { @@ -232,16 +214,10 @@ export const UploadableServiceConstructor = () => errors: Type.Never(), async handler(_ctx, init, msgStream) { let result = 0; - let lastMsg; - for await (const msg of msgStream) { - const { n } = msg.payload; + for await (const { n } of msgStream) { result += n; - lastMsg = msg; } - return reply( - lastMsg!, - Ok({ result: init.payload.prefix + ' ' + result }), - ); + return Ok({ result: init.prefix + ' ' + result }); }, }) .finalize(); diff --git a/__tests__/handler.test.ts b/__tests__/handler.test.ts index 175ea384..52148af7 100644 --- a/__tests__/handler.test.ts +++ b/__tests__/handler.test.ts @@ -1,10 +1,8 @@ import { asClientRpc, asClientStream, - asClientStreamWithInitialization, asClientSubscription, asClientUpload, - asClientUploadWithInitialization, iterNext, } from '../util/testHelpers'; import { assert, describe, expect, test } from 'vitest'; @@ -21,10 +19,9 @@ import { Observable } from './fixtures/observable'; describe('server-side test', () => { const service = TestServiceConstructor(); - const initialState = { count: 0 }; test('rpc basic', async () => { - const add = asClientRpc(initialState, service.procedures.add); + const add = asClientRpc({ count: 0 }, service.procedures.add); const result = await add({ n: 3 }); assert(result.ok); expect(result.payload).toStrictEqual({ result: 3 }); @@ -57,7 +54,7 @@ describe('server-side test', () => { test('stream basic', async () => { const [input, output] = asClientStream( - initialState, + { count: 0 }, service.procedures.echo, ); @@ -78,8 +75,8 @@ describe('server-side test', () => { }); test('stream with initialization', async () => { - const [input, output] = asClientStreamWithInitialization( - initialState, + const [input, output] = asClientStream( + { count: 0 }, service.procedures.echoWithPrefix, { prefix: 'test' }, ); @@ -148,7 +145,10 @@ describe('server-side test', () => { test('uploads', async () => { const service = UploadableServiceConstructor(); - const [input, result] = asClientUpload({}, service.procedures.addMultiple); + const [input, result] = await asClientUpload( + {}, + service.procedures.addMultiple, + ); input.push({ n: 1 }); input.push({ n: 2 }); @@ -158,7 +158,7 @@ describe('server-side test', () => { test('uploads with initialization', async () => { const service = UploadableServiceConstructor(); - const [input, result] = asClientUploadWithInitialization( + const [input, result] = await asClientUpload( {}, service.procedures.addMultipleWithPrefix, { prefix: 'test' }, diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index bb54e8cd..c2ba8947 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from 'vitest'; import { Procedure, ServiceBuilder, serializeService } from '../router/builder'; import { Type } from '@sinclair/typebox'; -import { MessageId, OpaqueTransportMessage, reply } from '../transport/message'; +import { MessageId, OpaqueTransportMessage } from '../transport/message'; import { createServer } from '../router/server'; import { Connection, Transport } from '../transport/transport'; import { NaiveJsonCodec } from '../codec/json'; @@ -32,10 +32,10 @@ const fnBody: Procedure<{}, 'rpc', typeof input, typeof output, typeof errors> = output, errors, async handler(_state, msg) { - if ('c' in msg.payload) { - return reply(msg, Ok({ b: msg.payload.c })); + if ('c' in msg) { + return Ok({ b: msg.c }); } else { - return reply(msg, Ok({ b: msg.payload.a })); + return Ok({ b: msg.a }); } }, }; diff --git a/router/builder.ts b/router/builder.ts index a0a3fc04..c5f4cd35 100644 --- a/router/builder.ts +++ b/router/builder.ts @@ -1,6 +1,5 @@ import { TObject, Static, Type, TUnion } from '@sinclair/typebox'; import type { Pushable } from 'it-pushable'; -import { TransportMessage } from '../transport/message'; import { ServiceContextWithState } from './context'; import { Result, RiverError, RiverUncaughtSchema } from './result'; @@ -176,8 +175,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: TransportMessage>, - ) => Promise, Static>>>; + input: Static, + ) => Promise, Static>>; type: Ty; } : never @@ -190,9 +189,9 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - init: TransportMessage>, - input: AsyncIterable>>, - ) => Promise, Static>>>; + init: Static, + input: AsyncIterableIterator>, + ) => Promise, Static>>; type: Ty; } : { @@ -201,8 +200,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: AsyncIterable>>, - ) => Promise, Static>>>; + input: AsyncIterableIterator>, + ) => Promise, Static>>; type: Ty; } : Ty extends 'subscription' @@ -213,8 +212,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: TransportMessage>, - output: Pushable, Static>>>, + input: Static, + output: Pushable, Static>>, ) => Promise; type: Ty; } @@ -228,9 +227,9 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - init: TransportMessage>, - input: AsyncIterable>>, - output: Pushable, Static>>>, + init: Static, + input: AsyncIterableIterator>, + output: Pushable, Static>>, ) => Promise; type: Ty; } @@ -240,8 +239,8 @@ export type Procedure< errors: E; handler: ( context: ServiceContextWithState, - input: AsyncIterable>>, - output: Pushable, Static>>>, + input: AsyncIterableIterator>, + output: Pushable, Static>>, ) => Promise; type: Ty; } diff --git a/router/client.ts b/router/client.ts index 947ad485..e62265b9 100644 --- a/router/client.ts +++ b/router/client.ts @@ -26,7 +26,7 @@ import { EventMap } from '../transport/events'; import { ServiceDefs } from './defs'; // helper to make next, yield, and return all the same type -type AsyncIter = AsyncGenerator; +export type AsyncIter = AsyncGenerator; /** * A helper type to transform an actual service type into a type diff --git a/router/server.ts b/router/server.ts index 5af6f835..c3a98b31 100644 --- a/router/server.ts +++ b/router/server.ts @@ -6,7 +6,6 @@ import type { Pushable } from 'it-pushable'; import { ControlMessagePayloadSchema, OpaqueTransportMessage, - TransportMessage, isStreamClose, isStreamOpen, reply, @@ -41,10 +40,8 @@ interface ProcStream { serviceName: string; procedureName: string; procedure: AnyProcedure; - incoming: Pushable; - outgoing: Pushable< - TransportMessage, Static>> - >; + incoming: Pushable; + outgoing: Pushable, Static>>; promises: { outputHandler: Promise; inputHandler: Promise; @@ -169,7 +166,7 @@ class RiverServer { // sending outgoing messages back to client (async () => { for await (const response of outgoing) { - this.transport.send(response); + this.transport.send(reply(message, response)); } // we ended, send a close bit back to the client @@ -193,13 +190,10 @@ class RiverServer { `${this.transport.clientId} -- procedure ${message.serviceName}.${message.procedureName}:${message.streamId} threw an error: ${errorMsg}`, ); outgoing.push( - reply( - message, - Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - } satisfies Static), - ), + Err({ + code: UNCAUGHT_ERROR, + message: errorMsg, + } satisfies Static), ); }; @@ -331,7 +325,7 @@ class RiverServer { Value.Check(procedure.init, message.payload)) || Value.Check(procedure.input, message.payload) ) { - procStream.incoming.push(message as TransportMessage); + procStream.incoming.push(message.payload as PayloadType); } else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { log?.error( `${this.transport.clientId} -- procedure ${procStream.serviceName}.${ diff --git a/util/testHelpers.ts b/util/testHelpers.ts index 9c72050d..6e68a80f 100644 --- a/util/testHelpers.ts +++ b/util/testHelpers.ts @@ -2,8 +2,6 @@ import WebSocket from 'isomorphic-ws'; import { WebSocketServer } from 'ws'; import http from 'http'; import { WebSocketClientTransport } from '../transport/impls/ws/client'; -import { Static } from '@sinclair/typebox'; -import { Procedure, ServiceContext } from '../router'; import { Connection, OpaqueTransportMessage, @@ -11,19 +9,20 @@ import { TransportClientId, TransportMessage, msg, - reply, } from '../transport'; -import { Pushable, pushable } from 'it-pushable'; +import { pushable } from 'it-pushable'; +import { Codec } from '../codec'; +import { WebSocketServerTransport } from '../transport/impls/ws/server'; import { - Err, + PayloadType, + Procedure, Result, RiverError, RiverUncaughtSchema, + ServiceContext, UNCAUGHT_ERROR, -} from '../router/result'; -import { Codec } from '../codec'; -import { WebSocketServerTransport } from '../transport/impls/ws/server'; -import { PayloadType } from '../router/builder'; +} from '../router'; +import { Static } from '@sinclair/typebox'; /** * Creates a WebSocket server instance using the provided HTTP server. @@ -91,451 +90,186 @@ export function createWsTransports( } /** - * Transforms an RPC procedure definition into a normal function call. + * Converts a payload object to a transport message with reasonable defaults. * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input message payload. - * @template O - The type of the output message payload. - * @param {State} state - The state object. - * @param {Procedure} proc - The RPC procedure to invoke. - * @param {Omit} [extendedContext] - Optional extended context. - * @returns A function that can be used to invoke the RPC procedure. + * @param payload - The payload object to be converted. + * @param streamId - The optional stream ID. + * @returns The transport message. */ -export function asClientRpc< - State extends object | unknown, - I extends PayloadType, - O extends PayloadType, - E extends RiverError, ->( - state: State, - proc: Procedure, - extendedContext?: Omit, -) { - return ( - msg: Static, - ): Promise< - Result, Static | Static> - > => - proc - .handler({ ...extendedContext, state }, payloadToTransportMessage(msg)) - .then((res) => res.payload) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }); +export function payloadToTransportMessage( + payload: Payload, + streamId?: string, + from: TransportClientId = 'client', + to: TransportClientId = 'SERVER', +): TransportMessage { + return msg(from, to, streamId ?? 'stream', payload, 'service', 'procedure'); } /** - * Transforms a stream procedure definition into a pair of input and output streams. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns Pair of input and output streams. + * Creates a dummy opaque transport message for testing purposes. + * @returns The created opaque transport message. */ -export function asClientStream< - State extends object | unknown, - I extends PayloadType, - O extends PayloadType, - E extends RiverError, ->( - state: State, - proc: Procedure, - extendedContext?: Omit, -): [ - Pushable>, - Pushable, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const rawOutput = pushable, Static>>({ - objectMode: true, - }); - - const transportInput = pushable>>({ - objectMode: true, - }); - const transportOutput = pushable< - TransportMessage, Static>> - >({ - objectMode: true, +export function createDummyTransportMessage(): OpaqueTransportMessage { + return payloadToTransportMessage({ + msg: 'cool', + test: Math.random(), }); - - // wrapping in transport - (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); - } - transportInput.end(); - })(); - - // unwrap from transport - (async () => { - for await (const transportRes of transportOutput) { - rawOutput.push(transportRes.payload); - } - rawOutput.end(); - })(); - - // handle - (async () => { - try { - await proc.handler( - { ...extendedContext, state }, - transportInput, - transportOutput, - ); - } catch (err) { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - transportOutput.push( - reply( - payloadToTransportMessage({}), - Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }), - ), - ); - } - transportOutput.end(); - })(); - - return [rawInput, rawOutput]; } /** - * Transforms a stream procedure definition into a pair of input and output streams. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns Pair of input and output streams. + * Retrieves the next value from an async iterable iterator. + * @param iter The async iterable iterator. + * @returns A promise that resolves to the next value from the iterator. */ -export function asClientStreamWithInitialization< - State extends object | unknown, - I extends PayloadType, - O extends PayloadType, - E extends RiverError, - Init extends PayloadType, ->( - state: State, - proc: Procedure, - init: Static, - extendedContext?: Omit, -): [ - Pushable>, - Pushable, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const rawOutput = pushable, Static>>({ - objectMode: true, - }); - - const transportInput = pushable>>({ - objectMode: true, - }); - const transportOutput = pushable< - TransportMessage, Static>> - >({ - objectMode: true, - }); +export async function iterNext(iter: AsyncIterableIterator) { + return await iter.next().then((res) => res.value); +} - // wrapping in transport - (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); +/** + * Waits for a message on the transport. + * @param {Transport} t - The transport to listen to. + * @param filter - An optional filter function to apply to the received messages. + * @returns A promise that resolves with the payload of the first message that passes the filter. + */ +export async function waitForMessage( + t: Transport, + filter?: (msg: OpaqueTransportMessage) => boolean, + rejectMismatch?: boolean, +) { + return new Promise((resolve, reject) => { + function cleanup() { + t.removeEventListener('message', onMessage); } - transportInput.end(); - })(); - // unwrap from transport - (async () => { - for await (const transportRes of transportOutput) { - rawOutput.push(transportRes.payload); + function onMessage(msg: OpaqueTransportMessage) { + if (!filter || filter?.(msg)) { + cleanup(); + resolve(msg.payload); + } else if (rejectMismatch) { + reject(new Error('message didnt match the filter')); + } } - rawOutput.end(); - })(); - // handle - (async () => { - try { - await proc.handler( - { ...extendedContext, state }, - payloadToTransportMessage(init), - transportInput, - transportOutput, - ); - } catch (err) { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - transportOutput.push( - reply( - payloadToTransportMessage({}), - Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }), - ), - ); - } - transportOutput.end(); - })(); + t.addEventListener('message', onMessage); + }); +} - return [rawInput, rawOutput]; +function catchProcError(err: unknown) { + const errorMsg = + err instanceof Error ? err.message : `[coerced to error] ${err}`; + return { + ok: false, + payload: { + code: UNCAUGHT_ERROR, + message: errorMsg, + }, + }; } -/** - * Transforms a subscription procedure definition into a procedure that returns an output stream. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns A function that when passed a message, returns the output stream. - */ -export function asClientSubscription< +export function asClientRpc< State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, + Init extends PayloadType | null = null, >( state: State, - proc: Procedure, + proc: Procedure, extendedContext?: Omit, ) { - const rawOutput = pushable, Static>>({ - objectMode: true, - }); - const transportOutput = pushable< - TransportMessage, Static>> - >({ - objectMode: true, - }); - - // unwrap from transport - (async () => { - for await (const transportRes of transportOutput) { - rawOutput.push(transportRes.payload); - } - rawOutput.end(); - })(); - return async ( msg: Static, ): Promise< - Pushable, Static | Static>> + Result, Static | Static> > => { - proc - .handler( - { ...extendedContext, state }, - payloadToTransportMessage(msg), - transportOutput, - ) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }); - - return rawOutput; + return await proc + .handler({ ...extendedContext, state }, msg) + .catch(catchProcError); }; } -/** - * Transforms an upload procedure definition into a procedure that returns an input stream. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns A function that when passed a message, returns the output stream. - */ -export function asClientUpload< +export function asClientStream< State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, + Init extends PayloadType | null = null, >( state: State, - proc: Procedure, + proc: Procedure, + init?: Init extends PayloadType ? Static : null, extendedContext?: Omit, -): [ - Pushable>, - Promise, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const transportInput = pushable>>({ +) { + const input = pushable>({ objectMode: true }); + const output = pushable, Static>>({ objectMode: true, }); - // wrapping in transport (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); + if (init) { + const _proc = proc as Procedure; + await _proc + .handler({ ...extendedContext, state }, init, input, output) + .catch((err) => output.push(catchProcError(err))); + } else { + const _proc = proc as Procedure; + await _proc + .handler({ ...extendedContext, state }, input, output) + .catch((err) => output.push(catchProcError(err))); } - transportInput.end(); })(); - return [ - rawInput, - proc - .handler({ ...extendedContext, state }, transportInput) - .then((res) => res.payload) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }), - ]; + return [input, output] as const; } -/** - * Transforms an upload with initialization procedure definition into a procedure that returns an - * input stream. - * Input messages can be pushed into the input stream. - * This should only be used for testing. - * @template State - The type of the state object. - * @template Init - The type of the init object. - * @template I - The type of the input object. - * @template O - The type of the output object. - * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. - * @param {Omit} [extendedContext] - The extended context object. - * @returns A function that when passed a message, returns the output stream. - */ -export function asClientUploadWithInitialization< +export function asClientSubscription< State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, - Init extends PayloadType, >( state: State, - proc: Procedure, - init: Static, + proc: Procedure, extendedContext?: Omit, -): [ - Pushable>, - Promise, Static | Static>>, -] { - const rawInput = pushable>({ objectMode: true }); - const transportInput = pushable>>({ +) { + const output = pushable, Static>>({ objectMode: true, }); - // wrapping in transport - (async () => { - for await (const rawIn of rawInput) { - transportInput.push(payloadToTransportMessage(rawIn)); - } - transportInput.end(); - })(); - - return [ - rawInput, - proc - .handler( - { ...extendedContext, state }, - payloadToTransportMessage(init), - transportInput, - ) - .then((res) => res.payload) - .catch((err) => { - const errorMsg = - err instanceof Error ? err.message : `[coerced to error] ${err}`; - return Err({ - code: UNCAUGHT_ERROR, - message: errorMsg, - }); - }), - ]; -} - -/** - * Converts a payload object to a transport message with reasonable defaults. - * This should only be used for testing. - * @param payload - The payload object to be converted. - * @param streamId - The optional stream ID. - * @returns The transport message. - */ -export function payloadToTransportMessage( - payload: Payload, - streamId?: string, - from: TransportClientId = 'client', - to: TransportClientId = 'SERVER', -): TransportMessage { - return msg(from, to, streamId ?? 'stream', payload, 'service', 'procedure'); -} - -/** - * Creates a dummy opaque transport message for testing purposes. - * @returns The created opaque transport message. - */ -export function createDummyTransportMessage(): OpaqueTransportMessage { - return payloadToTransportMessage({ - msg: 'cool', - test: Math.random(), - }); -} - -/** - * Retrieves the next value from an async iterable iterator. - * @param iter The async iterable iterator. - * @returns A promise that resolves to the next value from the iterator. - */ -export function iterNext(iter: AsyncIterableIterator) { - return iter.next().then((res) => res.value); + return async (msg: Static) => { + (async () => { + return await proc + .handler({ ...extendedContext, state }, msg, output) + .catch((err) => output.push(catchProcError(err))); + })(); + return output; + }; } -/** - * Waits for a message on the transport. - * @param {Transport} t - The transport to listen to. - * @param filter - An optional filter function to apply to the received messages. - * @returns A promise that resolves with the payload of the first message that passes the filter. - */ -export async function waitForMessage( - t: Transport, - filter?: (msg: OpaqueTransportMessage) => boolean, - rejectMismatch?: boolean, +export async function asClientUpload< + State extends object | unknown, + I extends PayloadType, + O extends PayloadType, + E extends RiverError, + Init extends PayloadType | null = null, +>( + state: State, + proc: Procedure, + init?: Init extends PayloadType ? Static : null, + extendedContext?: Omit, ) { - return new Promise((resolve, reject) => { - function cleanup() { - t.removeEventListener('message', onMessage); - } - - function onMessage(msg: OpaqueTransportMessage) { - if (!filter || filter?.(msg)) { - cleanup(); - resolve(msg.payload); - } else if (rejectMismatch) { - reject(new Error('message didnt match the filter')); - } - } - - t.addEventListener('message', onMessage); - }); + const input = pushable>({ objectMode: true }); + if (init) { + const _proc = proc as Procedure; + const result = _proc + .handler({ ...extendedContext, state }, init, input) + .catch(catchProcError); + return [input, result] as const; + } else { + const _proc = proc as Procedure; + const result = _proc + .handler({ ...extendedContext, state }, input) + .catch(catchProcError); + return [input, result] as const; + } }