From 9f7b0681876c5f7fc0faed563e14aa9018beb2df Mon Sep 17 00:00:00 2001 From: lhchavez Date: Mon, 11 Dec 2023 03:41:55 +0000 Subject: [PATCH] feat: Complete all RPC kinds We have three of the four gRPC RPC kinds (rpc, stream, subscription). This change adds the final RPC kind: upload (client-stream, single message response). It also adds the option of adding an `init` type to an RPC, where we can send a message of another schema before the main stream, which is a very common pattern used in filesystem (i.e. open file for writing, where the first message is the path and the other messages are the chunks) and collaboration (i.e. first message is the channel name and the rest of the bidirectional messages are the actual stream). --- __tests__/e2e.test.ts | 77 ++++++++++++ __tests__/fixtures/services.ts | 58 +++++++++ __tests__/handler.test.ts | 54 +++++++++ __tests__/serialize.test.ts | 39 ++++++ package.json | 2 +- router/builder.ts | 166 +++++++++++++++++++------- router/client.ts | 144 +++++++++++++++++++--- router/server.ts | 56 ++++++++- util/testHelpers.ts | 212 ++++++++++++++++++++++++++++++++- 9 files changed, 741 insertions(+), 67 deletions(-) diff --git a/__tests__/e2e.test.ts b/__tests__/e2e.test.ts index de19a10a..73ffc293 100644 --- a/__tests__/e2e.test.ts +++ b/__tests__/e2e.test.ts @@ -16,6 +16,7 @@ import { OrderingServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, + UploadableServiceConstructor, TestServiceConstructor, } from './fixtures/services'; import { UNCAUGHT_ERROR } from '../router/result'; @@ -135,6 +136,36 @@ describe.each(codecs)( }); }); + test('stream with init message', async () => { + const [clientTransport, serverTransport] = getTransports(); + const serviceDefs = { test: TestServiceConstructor() }; + const server = await createServer(serverTransport, serviceDefs); + const client = createClient(clientTransport); + + const [input, output, close] = await client.test.echoWithPrefix.stream({ + prefix: 'test', + }); + input.push({ msg: 'abc', ignore: false }); + input.push({ msg: 'def', ignore: true }); + input.push({ msg: 'ghi', ignore: false }); + input.end(); + + const result1 = await iterNext(output); + assert(result1.ok); + expect(result1.payload).toStrictEqual({ response: 'test abc' }); + + const result2 = await iterNext(output); + assert(result2.ok); + expect(result2.payload).toStrictEqual({ response: 'test ghi' }); + + close(); + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + server, + }); + }); + test('fallible stream', async () => { const [clientTransport, serverTransport] = getTransports(); const serviceDefs = { test: FallibleServiceConstructor() }; @@ -232,6 +263,52 @@ describe.each(codecs)( }); }); + test('upload', async () => { + const [clientTransport, serverTransport] = getTransports(); + + const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const server = await createServer(serverTransport, serviceDefs); + const client = createClient(clientTransport); + + const [addStream, addResult] = + await client.uploadable.addMultiple.upload(); + addStream.push({ n: 1 }); + addStream.push({ n: 2 }); + addStream.end(); + const result = await addResult; + assert(result.ok); + expect(result.payload).toStrictEqual({ result: 3 }); + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + server, + }); + }); + + test('upload with init message', async () => { + const [clientTransport, serverTransport] = getTransports(); + + const serviceDefs = { uploadable: UploadableServiceConstructor() }; + const server = await createServer(serverTransport, serviceDefs); + const client = createClient(clientTransport); + + const [addStream, addResult] = + await client.uploadable.addMultipleWithPrefix.upload({ + prefix: 'test', + }); + addStream.push({ n: 1 }); + addStream.push({ n: 2 }); + addStream.end(); + const result = await addResult; + assert(result.ok); + expect(result.payload).toStrictEqual({ result: 'test 3' }); + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + server, + }); + }); + test('message order is preserved in the face of disconnects', async () => { const [clientTransport, serverTransport] = getTransports(); const serviceDefs = { test: OrderingServiceConstructor() }; diff --git a/__tests__/fixtures/services.ts b/__tests__/fixtures/services.ts index 025c7902..ef17b09e 100644 --- a/__tests__/fixtures/services.ts +++ b/__tests__/fixtures/services.ts @@ -45,6 +45,23 @@ export const TestServiceConstructor = () => } }, }) + .defineProcedure('echoWithPrefix', { + type: 'stream', + init: Type.Object({ prefix: Type.String() }), + input: EchoRequest, + output: EchoResponse, + errors: Type.Never(), + async handler(_ctx, init, msgStream, returnStream) { + for await (const msg of msgStream) { + const req = msg.payload; + if (!req.ignore) { + returnStream.push( + reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })), + ); + } + } + }, + }) .finalize(); export const OrderingServiceConstructor = () => @@ -187,3 +204,44 @@ export const SubscribableServiceConstructor = () => }, }) .finalize(); + +export const UploadableServiceConstructor = () => + ServiceBuilder.create('uploadable') + .initialState({}) + .defineProcedure('addMultiple', { + type: 'upload', + input: Type.Object({ n: Type.Number() }), + output: Type.Object({ result: Type.Number() }), + errors: Type.Never(), + async handler(_ctx, msgStream) { + let result = 0; + let lastMsg; + for await (const msg of msgStream) { + const { n } = msg.payload; + result += n; + lastMsg = msg; + } + return reply(lastMsg!, Ok({ result: result })); + }, + }) + .defineProcedure('addMultipleWithPrefix', { + type: 'upload', + init: Type.Object({ prefix: Type.String() }), + input: Type.Object({ n: Type.Number() }), + output: Type.Object({ result: Type.String() }), + errors: Type.Never(), + async handler(_ctx, init, msgStream) { + let result = 0; + let lastMsg; + for await (const msg of msgStream) { + const { n } = msg.payload; + result += n; + lastMsg = msg; + } + return reply( + lastMsg!, + Ok({ result: init.payload.prefix + ' ' + result }), + ); + }, + }) + .finalize(); diff --git a/__tests__/handler.test.ts b/__tests__/handler.test.ts index c1153d90..175ea384 100644 --- a/__tests__/handler.test.ts +++ b/__tests__/handler.test.ts @@ -1,7 +1,10 @@ import { asClientRpc, asClientStream, + asClientStreamWithInitialization, asClientSubscription, + asClientUpload, + asClientUploadWithInitialization, iterNext, } from '../util/testHelpers'; import { assert, describe, expect, test } from 'vitest'; @@ -10,6 +13,7 @@ import { FallibleServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, + UploadableServiceConstructor, TestServiceConstructor, } from './fixtures/services'; import { UNCAUGHT_ERROR } from '../router/result'; @@ -73,6 +77,29 @@ describe('server-side test', () => { expect(output.readableLength).toBe(0); }); + test('stream with initialization', async () => { + const [input, output] = asClientStreamWithInitialization( + initialState, + service.procedures.echoWithPrefix, + { prefix: 'test' }, + ); + + input.push({ msg: 'abc', ignore: false }); + input.push({ msg: 'def', ignore: true }); + input.push({ msg: 'ghi', ignore: false }); + input.end(); + + const result1 = await iterNext(output); + assert(result1 && result1.ok); + expect(result1.payload).toStrictEqual({ response: 'test abc' }); + + const result2 = await iterNext(output); + assert(result2 && result2.ok); + expect(result2.payload).toStrictEqual({ response: 'test ghi' }); + + expect(output.readableLength).toBe(0); + }); + test('fallible stream', async () => { const service = FallibleServiceConstructor(); const [input, output] = asClientStream({}, service.procedures.echo); @@ -118,4 +145,31 @@ describe('server-side test', () => { assert(streamResult2 && streamResult1.ok); expect(streamResult2.payload).toStrictEqual({ result: 3 }); }); + + test('uploads', async () => { + const service = UploadableServiceConstructor(); + const [input, result] = asClientUpload({}, service.procedures.addMultiple); + + input.push({ n: 1 }); + input.push({ n: 2 }); + input.end(); + expect(await result).toStrictEqual({ ok: true, payload: { result: 3 } }); + }); + + test('uploads with initialization', async () => { + const service = UploadableServiceConstructor(); + const [input, result] = asClientUploadWithInitialization( + {}, + service.procedures.addMultipleWithPrefix, + { prefix: 'test' }, + ); + + input.push({ n: 1 }); + input.push({ n: 2 }); + input.end(); + expect(await result).toStrictEqual({ + ok: true, + payload: { result: 'test 3' }, + }); + }); }); diff --git a/__tests__/serialize.test.ts b/__tests__/serialize.test.ts index b007437e..89f08521 100644 --- a/__tests__/serialize.test.ts +++ b/__tests__/serialize.test.ts @@ -51,6 +51,45 @@ describe('serialize service to jsonschema', () => { errors: { not: {} }, type: 'stream', }, + echoWithPrefix: { + errors: { + not: {}, + }, + init: { + properties: { + prefix: { + type: 'string', + }, + }, + required: ['prefix'], + type: 'object', + }, + input: { + properties: { + end: { + type: 'boolean', + }, + ignore: { + type: 'boolean', + }, + msg: { + type: 'string', + }, + }, + required: ['msg', 'ignore'], + type: 'object', + }, + output: { + properties: { + response: { + type: 'string', + }, + }, + required: ['response'], + type: 'object', + }, + type: 'stream', + }, }, }); }); diff --git a/package.json b/package.json index 14bce83a..0a5247aa 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "@replit/river", "sideEffects": false, "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.8.1", + "version": "0.9.0", "type": "module", "exports": { ".": "./dist/router/index.js", diff --git a/router/builder.ts b/router/builder.ts index 204d5b51..b15f74bc 100644 --- a/router/builder.ts +++ b/router/builder.ts @@ -5,9 +5,19 @@ import { ServiceContextWithState } from './context'; import { Result, RiverError, RiverUncaughtSchema } from './result'; /** - * The valid {@link Procedure} types. + * The valid {@link Procedure} types. The `stream` and `upload` types can optionally have a + * different type for the very first initialization message. The suffixless types correspond to + * gRPC's four combinations of stream / non-stream in each direction. */ -export type ValidProcType = 'rpc' | 'stream' | 'subscription'; +export type ValidProcType = + // Single message in both directions. + | 'rpc' + // Client-stream (potentially preceded by an initialization message), single message from server. + | 'upload' + // Single message from client, stream from server. + | 'subscription' + // Bidirectional stream (potentially preceded by an initialization message). + | 'stream'; /** * A generic procedure listing where the keys are the names of the procedures @@ -54,6 +64,12 @@ export function serializeService(s: AnyService): object { output: Type.Strict(procDef.output), errors: Type.Strict(procDef.errors), type: procDef.type, + // Only add the `init` field if the type declares it. + ...('init' in procDef + ? { + init: Type.Strict(procDef.init), + } + : {}), }, ]), ), @@ -70,6 +86,26 @@ export type ProcHandler< ProcName extends keyof S['procedures'], > = S['procedures'][ProcName]['handler']; +/** + * Helper to get whether the type definition for the procedure contains an init type. + * @template S - The service. + * @template ProcName - The name of the procedure. + */ +export type ProcHasInit< + S extends AnyService, + ProcName extends keyof S['procedures'], +> = S['procedures'][ProcName] extends { init: any } ? true : false; + +/** + * Helper to get the type definition for the procedure init type of a service. + * @template S - The service. + * @template ProcName - The name of the procedure. + */ +export type ProcInit< + S extends AnyService, + ProcName extends keyof S['procedures'], +> = S['procedures'][ProcName]['init']; + /** * Helper to get the type definition for the procedure input of a service. * @template S - The service. @@ -118,6 +154,7 @@ export type PayloadType = TObject | TUnion; * @template Ty - The type of the procedure. * @template I - The TypeBox schema of the input object. * @template O - The TypeBox schema of the output object. + * @template Init - The TypeBox schema of the input initialization object. */ export type Procedure< State extends object | unknown, @@ -125,48 +162,92 @@ export type Procedure< I extends PayloadType, O extends PayloadType, E extends RiverError, + Init extends PayloadType | null = null, > = Ty extends 'rpc' - ? { - input: I; - output: O; - errors: E; - handler: ( - context: ServiceContextWithState, - input: TransportMessage>, - ) => Promise, Static>>>; - type: Ty; - } - : Ty extends 'stream' - ? { - input: I; - output: O; - errors: E; - handler: ( - context: ServiceContextWithState, - input: AsyncIterable>>, - output: Pushable, Static>>>, - ) => Promise; - type: Ty; - } + ? Init extends null + ? { + input: I; + output: O; + errors: E; + handler: ( + context: ServiceContextWithState, + input: TransportMessage>, + ) => Promise, Static>>>; + type: Ty; + } + : never + : Ty extends 'upload' + ? Init extends PayloadType + ? { + init: Init; + input: I; + output: O; + errors: E; + handler: ( + context: ServiceContextWithState, + init: TransportMessage>, + input: AsyncIterable>>, + ) => Promise, Static>>>; + type: Ty; + } + : { + input: I; + output: O; + errors: E; + handler: ( + context: ServiceContextWithState, + input: AsyncIterable>>, + ) => Promise, Static>>>; + type: Ty; + } : Ty extends 'subscription' - ? { - input: I; - output: O; - errors: E; - handler: ( - context: ServiceContextWithState, - input: TransportMessage>, - output: Pushable, Static>>>, - ) => Promise; - type: Ty; - } + ? Init extends null + ? { + input: I; + output: O; + errors: E; + handler: ( + context: ServiceContextWithState, + input: TransportMessage>, + output: Pushable, Static>>>, + ) => Promise; + type: Ty; + } + : never + : Ty extends 'stream' + ? Init extends PayloadType + ? { + init: Init; + input: I; + output: O; + errors: E; + handler: ( + context: ServiceContextWithState, + init: TransportMessage>, + input: AsyncIterable>>, + output: Pushable, Static>>>, + ) => Promise; + type: Ty; + } + : { + input: I; + output: O; + errors: E; + handler: ( + context: ServiceContextWithState, + input: AsyncIterable>>, + output: Pushable, Static>>>, + ) => Promise; + type: Ty; + } : never; export type AnyProcedure = Procedure< object, ValidProcType, PayloadType, PayloadType, - RiverError + RiverError, + PayloadType | null >; /** @@ -210,8 +291,8 @@ export class ServiceBuilder> { /** * Defines a new procedure for the service. * @param {ProcName} procName The name of the procedure. - * @param {Procedure} procDef The definition of the procedure. - * @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure; }; }>} A new ServiceBuilder instance with the updated schema. + * @param {Procedure} procDef The definition of the procedure. + * @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure; }; }>} A new ServiceBuilder instance with the updated schema. */ defineProcedure< ProcName extends string, @@ -219,17 +300,20 @@ export class ServiceBuilder> { I extends PayloadType, O extends PayloadType, E extends RiverError, + Init extends PayloadType | null = null, >( procName: ProcName, - procDef: Procedure, + procDef: Procedure, ): ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { - [k in ProcName]: Procedure; + [k in ProcName]: Procedure; }; }> { - type ProcListing = { [k in ProcName]: Procedure }; + type ProcListing = { + [k in ProcName]: Procedure; + }; const newProcedure = { [procName]: procDef } as ProcListing; const procedures = { ...this.schema.procedures, diff --git a/router/client.ts b/router/client.ts index 4d03b52c..14c9c7c4 100644 --- a/router/client.ts +++ b/router/client.ts @@ -2,6 +2,8 @@ import { Connection, Transport } from '../transport/transport'; import { AnyService, ProcErrors, + ProcHasInit, + ProcInit, ProcInput, ProcOutput, ProcType, @@ -45,21 +47,64 @@ type ServiceClient = { > >; } + : ProcType extends 'upload' + ? ProcHasInit extends true + ? { + upload: (init: Static>) => Promise< + [ + Pushable>>, // input + Promise< + Result< + Static>, + Static> + > + >, // output + ] + >; + } + : { + upload: () => Promise< + [ + Pushable>>, // input + Promise< + Result< + Static>, + Static> + > + >, // output + ] + >; + } : ProcType extends 'stream' - ? { - stream: () => Promise< - [ - Pushable>>, // input - AsyncIter< - Result< - Static>, - Static> - > - >, // output - () => void, // close handle - ] - >; - } + ? ProcHasInit extends true + ? { + stream: (init: Static>) => Promise< + [ + Pushable>>, // input + AsyncIter< + Result< + Static>, + Static> + > + >, // output + () => void, // close handle + ] + >; + } + : { + stream: () => Promise< + [ + Pushable>>, // input + AsyncIter< + Result< + Static>, + Static> + > + >, // output + () => void, // close handle + ] + >; + } : ProcType extends 'subscription' ? { subscribe: (input: Static>) => Promise< @@ -156,8 +201,24 @@ export const createClient = >>( const outputStream = pushable({ objectMode: true }); let firstMessage = true; + if (input) { + const m = msg( + transport.clientId, + serverId, + serviceName, + procName, + streamId, + input as object, + ); + + // first message needs the open bit. + m.controlFlags = ControlFlags.StreamOpenBit; + transport.send(m); + firstMessage = false; + } + // input -> transport - // this gets cleaned up on i.end() which is called by closeHandler + // this gets cleaned up on inputStream.end() which is called by closeHandler (async () => { for await (const rawIn of inputStream) { const m = msg( @@ -259,6 +320,59 @@ export const createClient = >>( }; return [outputStream, closeHandler]; + } else if (procType === 'upload') { + const inputStream = pushable({ objectMode: true }); + let firstMessage = true; + + if (input) { + const m = msg( + transport.clientId, + serverId, + serviceName, + procName, + streamId, + input as object, + ); + + // first message needs the open bit. + m.controlFlags = ControlFlags.StreamOpenBit; + transport.send(m); + firstMessage = false; + } + + // input -> transport + // this gets cleaned up on inputStream.end(), which the caller should call. + (async () => { + for await (const rawIn of inputStream) { + const m = msg( + transport.clientId, + serverId, + serviceName, + procName, + streamId, + rawIn as object, + ); + + if (firstMessage) { + m.controlFlags |= ControlFlags.StreamOpenBit; + firstMessage = false; + } + + transport.send(m); + } + + transport.send( + closeStream( + transport.clientId, + serverId, + serviceName, + procName, + streamId, + ), + ); + })(); + + return [inputStream, waitForMessage(transport, belongsToSameStream)]; } else { throw new Error(`invalid river call, unknown procedure type ${procType}`); } diff --git a/router/server.ts b/router/server.ts index daa3aedb..8c12f028 100644 --- a/router/server.ts +++ b/router/server.ts @@ -166,9 +166,22 @@ export async function createServer>( // pump incoming message stream -> handler -> outgoing message stream let inputHandler: Promise; if (procedure.type === 'stream') { - inputHandler = procedure - .handler(serviceContext, incoming, outgoing) - .catch(errorHandler); + if ('init' in procedure) { + inputHandler = (async () => { + const initMessage = await incoming.next(); + if (initMessage.done) { + return; + } + + return procedure + .handler(serviceContext, initMessage.value, incoming, outgoing) + .catch(errorHandler); + })(); + } else { + inputHandler = procedure + .handler(serviceContext, incoming, outgoing) + .catch(errorHandler); + } } else if (procedure.type === 'rpc') { inputHandler = (async () => { const inputMessage = await incoming.next(); @@ -203,6 +216,38 @@ export async function createServer>( errorHandler(err); } })(); + } else if (procedure.type === 'upload') { + if ('init' in procedure) { + inputHandler = (async () => { + const initMessage = await incoming.next(); + if (initMessage.done) { + return; + } + + try { + const outputMessage = await procedure.handler( + serviceContext, + initMessage.value, + incoming, + ); + outgoing.push(outputMessage); + } catch (err) { + errorHandler(err); + } + })(); + } else { + inputHandler = (async () => { + try { + const outputMessage = await procedure.handler( + serviceContext, + incoming, + ); + outgoing.push(outputMessage); + } catch (err) { + errorHandler(err); + } + })(); + } } else { // procedure is inferred to be never here as this is not a valid procedure type // we cast just to log @@ -229,7 +274,10 @@ export async function createServer>( return; } - if (Value.Check(procedure.input, message.payload)) { + if ( + Value.Check(procedure.input, message.payload) || + ('init' in procedure && Value.Check(procedure.init, message.payload)) + ) { procStream.incoming.push(message as TransportMessage); } else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { log?.error( diff --git a/util/testHelpers.ts b/util/testHelpers.ts index 18d06162..754ebe30 100644 --- a/util/testHelpers.ts +++ b/util/testHelpers.ts @@ -95,7 +95,7 @@ export function createWsTransports( * @template I - The type of the input message payload. * @template O - The type of the output message payload. * @param {State} state - The state object. - * @param {Procedure} proc - The RPC procedure to invoke. + * @param {Procedure} proc - The RPC procedure to invoke. * @param {Omit} [extendedContext] - Optional extended context. * @returns A function that can be used to invoke the RPC procedure. */ @@ -106,7 +106,7 @@ export function asClientRpc< E extends RiverError, >( state: State, - proc: Procedure, + proc: Procedure, extendedContext?: Omit, ) { return ( @@ -135,7 +135,7 @@ export function asClientRpc< * @template I - The type of the input object. * @template O - The type of the output object. * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. + * @param {Procedure} proc - The procedure to handle the stream. * @param {Omit} [extendedContext] - The extended context object. * @returns Pair of input and output streams. */ @@ -146,7 +146,7 @@ export function asClientStream< E extends RiverError, >( state: State, - proc: Procedure, + proc: Procedure, extendedContext?: Omit, ): [ Pushable>, @@ -208,6 +208,90 @@ export function asClientStream< return [rawInput, rawOutput]; } +/** + * Transforms a stream procedure definition into a pair of input and output streams. + * Input messages can be pushed into the input stream. + * This should only be used for testing. + * @template State - The type of the state object. + * @template I - The type of the input object. + * @template O - The type of the output object. + * @param {State} state - The state object. + * @param {Procedure} proc - The procedure to handle the stream. + * @param {Omit} [extendedContext] - The extended context object. + * @returns Pair of input and output streams. + */ +export function asClientStreamWithInitialization< + State extends object | unknown, + I extends PayloadType, + O extends PayloadType, + E extends RiverError, + Init extends PayloadType, +>( + state: State, + proc: Procedure, + init: Static, + extendedContext?: Omit, +): [ + Pushable>, + Pushable, Static | Static>>, +] { + const rawInput = pushable>({ objectMode: true }); + const rawOutput = pushable, Static>>({ + objectMode: true, + }); + + const transportInput = pushable>>({ + objectMode: true, + }); + const transportOutput = pushable< + TransportMessage, Static>> + >({ + objectMode: true, + }); + + // wrapping in transport + (async () => { + for await (const rawIn of rawInput) { + transportInput.push(payloadToTransportMessage(rawIn)); + } + transportInput.end(); + })(); + + // unwrap from transport + (async () => { + for await (const transportRes of transportOutput) { + rawOutput.push(transportRes.payload); + } + })(); + + // handle + (async () => { + try { + await proc.handler( + { ...extendedContext, state }, + payloadToTransportMessage(init), + transportInput, + transportOutput, + ); + } catch (err) { + const errorMsg = + err instanceof Error ? err.message : `[coerced to error] ${err}`; + transportOutput.push( + reply( + payloadToTransportMessage({}), + Err({ + code: UNCAUGHT_ERROR, + message: errorMsg, + }), + ), + ); + } + transportOutput.end(); + })(); + + return [rawInput, rawOutput]; +} + /** * Transforms a subscription procedure definition into a procedure that returns an output stream. * Input messages can be pushed into the input stream. @@ -216,7 +300,7 @@ export function asClientStream< * @template I - The type of the input object. * @template O - The type of the output object. * @param {State} state - The state object. - * @param {Procedure} proc - The procedure to handle the stream. + * @param {Procedure} proc - The procedure to handle the stream. * @param {Omit} [extendedContext] - The extended context object. * @returns A function that when passed a message, returns the output stream. */ @@ -227,7 +311,7 @@ export function asClientSubscription< E extends RiverError, >( state: State, - proc: Procedure, + proc: Procedure, extendedContext?: Omit, ) { const rawOutput = pushable, Static>>({ @@ -270,6 +354,122 @@ export function asClientSubscription< }; } +/** + * Transforms an upload procedure definition into a procedure that returns an input stream. + * Input messages can be pushed into the input stream. + * This should only be used for testing. + * @template State - The type of the state object. + * @template I - The type of the input object. + * @template O - The type of the output object. + * @param {State} state - The state object. + * @param {Procedure} proc - The procedure to handle the stream. + * @param {Omit} [extendedContext] - The extended context object. + * @returns A function that when passed a message, returns the output stream. + */ +export function asClientUpload< + State extends object | unknown, + I extends PayloadType, + O extends PayloadType, + E extends RiverError, +>( + state: State, + proc: Procedure, + extendedContext?: Omit, +): [ + Pushable>, + Promise, Static | Static>>, +] { + const rawInput = pushable>({ objectMode: true }); + const transportInput = pushable>>({ + objectMode: true, + }); + + // wrapping in transport + (async () => { + for await (const rawIn of rawInput) { + transportInput.push(payloadToTransportMessage(rawIn)); + } + transportInput.end(); + })(); + + return [ + rawInput, + proc + .handler({ ...extendedContext, state }, transportInput) + .then((res) => res.payload) + .catch((err) => { + const errorMsg = + err instanceof Error ? err.message : `[coerced to error] ${err}`; + return Err({ + code: UNCAUGHT_ERROR, + message: errorMsg, + }); + }), + ]; +} + +/** + * Transforms an upload with initialization procedure definition into a procedure that returns an + * input stream. + * Input messages can be pushed into the input stream. + * This should only be used for testing. + * @template State - The type of the state object. + * @template Init - The type of the init object. + * @template I - The type of the input object. + * @template O - The type of the output object. + * @param {State} state - The state object. + * @param {Procedure} proc - The procedure to handle the stream. + * @param {Omit} [extendedContext] - The extended context object. + * @returns A function that when passed a message, returns the output stream. + */ +export function asClientUploadWithInitialization< + State extends object | unknown, + I extends PayloadType, + O extends PayloadType, + E extends RiverError, + Init extends PayloadType, +>( + state: State, + proc: Procedure, + init: Static, + extendedContext?: Omit, +): [ + Pushable>, + Promise, Static | Static>>, +] { + const rawInput = pushable>({ objectMode: true }); + const transportInput = pushable>>({ + objectMode: true, + }); + + // wrapping in transport + (async () => { + for await (const rawIn of rawInput) { + transportInput.push(payloadToTransportMessage(rawIn)); + } + transportInput.end(); + })(); + + return [ + rawInput, + proc + .handler( + { ...extendedContext, state }, + payloadToTransportMessage(init), + transportInput, + ) + .then((res) => res.payload) + .catch((err) => { + const errorMsg = + err instanceof Error ? err.message : `[coerced to error] ${err}`; + return Err({ + code: UNCAUGHT_ERROR, + message: errorMsg, + }); + }), + ]; +} + /** * Converts a payload object to a transport message with reasonable defaults. * This should only be used for testing.