From 1f8041cf23f10cc58625d84f028137d65e469367 Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Tue, 26 Sep 2023 12:55:36 -0600 Subject: [PATCH 1/9] isomorphic environment --- environment/types.ts | 4 ++++ index.ts | 2 ++ router/builder.ts | 10 ++++++++-- router/server.ts | 22 +++++++++++++++++++--- 4 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 environment/types.ts diff --git a/environment/types.ts b/environment/types.ts new file mode 100644 index 00000000..a5d03aa1 --- /dev/null +++ b/environment/types.ts @@ -0,0 +1,4 @@ +export interface IsomorphicEnvironment { + log: (...args: unknown[]) => void; + error: (...args: unknown[]) => void; +} diff --git a/index.ts b/index.ts index 168879b6..005dd36c 100644 --- a/index.ts +++ b/index.ts @@ -45,3 +45,5 @@ export { waitForSocketReady, createWebSocketClient, } from './transport/util'; + +export type { IsomorphicEnvironment } from './environment/types'; diff --git a/router/builder.ts b/router/builder.ts index baf8c1dd..f7c9fc05 100644 --- a/router/builder.ts +++ b/router/builder.ts @@ -1,6 +1,7 @@ import { TObject, Static, Type } from '@sinclair/typebox'; import type { Pushable } from 'it-pushable'; import { TransportMessage } from '../transport/message'; +import { IsomorphicEnvironment } from '../environment/types'; export type ValidProcType = 'stream' | 'rpc'; export type ProcListing = Record< @@ -58,6 +59,11 @@ export type ProcType< ProcName extends keyof S['procedures'], > = S['procedures'][ProcName]['type']; +export interface ProcedureContext { + environment: IsomorphicEnvironment; + state: State; +} + export type Procedure< State extends object | unknown, Ty extends ValidProcType, @@ -68,7 +74,7 @@ export type Procedure< input: I; output: O; handler: ( - state: State, + context: ProcedureContext, input: TransportMessage>, ) => Promise>>; type: Ty; @@ -77,7 +83,7 @@ export type Procedure< input: I; output: O; handler: ( - state: State, + context: ProcedureContext, input: AsyncIterable>>, output: Pushable>>, ) => Promise; diff --git a/router/server.ts b/router/server.ts index c4ee127e..64f83d50 100644 --- a/router/server.ts +++ b/router/server.ts @@ -1,10 +1,16 @@ import { TObject } from '@sinclair/typebox'; import { Transport } from '../transport/types'; -import { AnyService, Procedure, ValidProcType } from './builder'; +import { + AnyService, + Procedure, + ProcedureContext, + ValidProcType, +} from './builder'; import { Value } from '@sinclair/typebox/value'; import { pushable } from 'it-pushable'; import type { Pushable } from 'it-pushable'; import { OpaqueTransportMessage, TransportMessage } from '../transport/message'; +import { IsomorphicEnvironment } from '../environment/types'; export interface Server { services: Services; @@ -18,12 +24,18 @@ interface ProcStream { } export async function createServer>( + environment: IsomorphicEnvironment, transport: Transport, services: Services, ): Promise> { // create streams for every stream procedure const streamMap: Map = new Map(); for (const [serviceName, service] of Object.entries(services)) { + const context: ProcedureContext = { + environment, + state: service.state, + }; + for (const [procedureName, proc] of Object.entries(service.procedures)) { const procedure = proc as Procedure< object, @@ -39,7 +51,7 @@ export async function createServer>( outgoing, doneCtx: Promise.all([ // processing the actual procedure - procedure.handler(service.state, incoming, outgoing), + procedure.handler(context, incoming, outgoing), // sending outgoing messages back to client (async () => { for await (const response of outgoing) { @@ -78,7 +90,11 @@ export async function createServer>( Value.Check(procedure.input, inputMessage.payload) ) { // synchronous rpc - const response = await procedure.handler(service.state, inputMessage); + const context: ProcedureContext = { + environment, + state: service.state, + }; + const response = await procedure.handler(context, inputMessage); transport.send(response); return; } else if ( From a63cf42c3a5b4dc633144c400a89e9ac62a02016 Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:06:41 -0600 Subject: [PATCH 2/9] add test environment stuff --- __tests__/integration.test.ts | 24 +++++++++++++++--------- __tests__/typescript-stress.test.ts | 7 ++++++- environment/util.ts | 12 ++++++++++++ index.ts | 1 + router/server.util.ts | 12 +++++++++--- 5 files changed, 43 insertions(+), 13 deletions(-) create mode 100644 environment/util.ts diff --git a/__tests__/integration.test.ts b/__tests__/integration.test.ts index 5f90a7a9..9c6bb8dd 100644 --- a/__tests__/integration.test.ts +++ b/__tests__/integration.test.ts @@ -12,6 +12,7 @@ import { import { createServer } from '../router/server'; import { createClient } from '../router/client'; import { asClientRpc, asClientStream } from '../router/server.util'; +import { createTestEnvironment } from '../environment/util'; export const EchoRequest = Type.Object({ msg: Type.String(), @@ -28,17 +29,17 @@ export const TestServiceConstructor = () => type: 'rpc', input: Type.Object({ n: Type.Number() }), output: Type.Object({ result: Type.Number() }), - async handler(state, msg) { + async handler(ctx, msg) { const { n } = msg.payload; - state.count += n; - return reply(msg, { result: state.count }); + ctx.state.count += n; + return reply(msg, { result: ctx.state.count }); }, }) .defineProcedure('echo', { type: 'stream', input: EchoRequest, output: EchoResponse, - async handler(_state, msgStream, returnStream) { + async handler(_ctx, msgStream, returnStream) { for await (const msg of msgStream) { const req = msg.payload; if (!req.ignore) { @@ -99,17 +100,20 @@ describe('server-side test', () => { const initialState = { count: 0 }; test('rpc basic', async () => { - const add = asClientRpc(initialState, service.procedures.add); + const env = createTestEnvironment(); + const add = asClientRpc(env, initialState, service.procedures.add); await expect(add({ n: 3 })).resolves.toStrictEqual({ result: 3 }); }); test('rpc initial state', async () => { - const add = asClientRpc({ count: 5 }, service.procedures.add); + const env = createTestEnvironment(); + const add = asClientRpc(env, { count: 5 }, service.procedures.add); await expect(add({ n: 6 })).resolves.toStrictEqual({ result: 11 }); }); test('stream basic', async () => { - const [i, o] = asClientStream(initialState, service.procedures.echo); + const env = createTestEnvironment(); + const [i, o] = asClientStream(env, initialState, service.procedures.echo); i.push({ msg: 'abc', ignore: false }); i.push({ msg: 'def', ignore: true }); @@ -144,9 +148,10 @@ describe('client <-> server integration test', () => { }); test('rpc', async () => { + const env = createTestEnvironment(); const [ct, st] = await createWsTransports(port, wss); const serviceDefs = { test: TestServiceConstructor() }; - const server = await createServer(st, serviceDefs); + const server = await createServer(env, st, serviceDefs); const client = createClient(ct); await expect(client.test.add({ n: 3 })).resolves.toStrictEqual({ result: 3, @@ -154,9 +159,10 @@ describe('client <-> server integration test', () => { }); test('stream', async () => { + const env = createTestEnvironment(); const [ct, st] = await createWsTransports(port, wss); const serviceDefs = { test: TestServiceConstructor() }; - const server = await createServer(st, serviceDefs); + const server = await createServer(env, st, serviceDefs); const client = createClient(ct); const [i, o, close] = await client.test.echo(); diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 7ae62469..3d2c25bb 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -6,6 +6,7 @@ import { createServer } from '../router/server'; import { Transport } from '../transport/types'; import { NaiveJsonCodec } from '../codec/json'; import { createClient } from '../router/client'; +import { createTestEnvironment } from '../environment/util'; const fnBody: Procedure<{}, 'rpc', TObject, TObject> = { type: 'rpc', @@ -97,7 +98,11 @@ describe("ensure typescript doesn't give up trying to infer the types for large c: svc(), d: svc(), }; - const server = await createServer(new MockTransport('SERVER'), listing); + const server = await createServer( + createTestEnvironment(), + new MockTransport('SERVER'), + listing, + ); const client = createClient(new MockTransport('client')); expect(server).toBeTruthy(); expect(client).toBeTruthy(); diff --git a/environment/util.ts b/environment/util.ts new file mode 100644 index 00000000..289cd712 --- /dev/null +++ b/environment/util.ts @@ -0,0 +1,12 @@ +import { IsomorphicEnvironment } from './types'; + +export function createTestEnvironment(): IsomorphicEnvironment { + return { + log: (...args: unknown[]) => { + console.log(...args); + }, + error: (...args: unknown[]) => { + console.error(...args); + }, + }; +} diff --git a/index.ts b/index.ts index 005dd36c..f40ff4d2 100644 --- a/index.ts +++ b/index.ts @@ -46,4 +46,5 @@ export { createWebSocketClient, } from './transport/util'; +export { createTestEnvironment } from './environment/util'; export type { IsomorphicEnvironment } from './environment/types'; diff --git a/router/server.util.ts b/router/server.util.ts index c121fbf5..b114a182 100644 --- a/router/server.util.ts +++ b/router/server.util.ts @@ -6,15 +6,20 @@ import { } from '../transport/message'; import { pushable } from 'it-pushable'; import type { Pushable } from 'it-pushable'; +import { IsomorphicEnvironment } from '../environment/types'; export function asClientRpc< State extends object | unknown, I extends TObject, O extends TObject, ->(state: State, proc: Procedure) { +>( + environment: IsomorphicEnvironment, + state: State, + proc: Procedure, +) { return (msg: Static) => proc - .handler(state, payloadToTransportMessage(msg)) + .handler({ environment, state }, payloadToTransportMessage(msg)) .then((res) => res.payload); } @@ -23,6 +28,7 @@ export function asClientStream< I extends TObject, O extends TObject, >( + environment: IsomorphicEnvironment, state: State, proc: Procedure, ): [Pushable>, Pushable>] { @@ -49,7 +55,7 @@ export function asClientStream< // handle (async () => { - await proc.handler(state, ri, ro); + await proc.handler({ environment, state }, ri, ro); ro.end(); })(); From f92fa3d2f5e7fc218a6cc90cb66508428685440b Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:14:42 -0600 Subject: [PATCH 3/9] add declaration merging comment --- environment/types.ts | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/environment/types.ts b/environment/types.ts index a5d03aa1..5e65f4b9 100644 --- a/environment/types.ts +++ b/environment/types.ts @@ -1,4 +1,16 @@ -export interface IsomorphicEnvironment { - log: (...args: unknown[]) => void; - error: (...args: unknown[]) => void; -} +/** + * The environment for services/procedures. This is used only on + * the server. + * + * You should use declaration merging to extend this interface + * with whatever you need. For example, if you need to access + * a database, you can do: + * ```ts + * declare module '@replit/river' { + * interface IsomorphicEnvironment { + * db: Database; + * } + * } + * ``` + */ +export interface IsomorphicEnvironment {} From 4b71bfc76d481decdfa7bd901dbe4fe4a4d2504d Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:16:54 -0600 Subject: [PATCH 4/9] remove unneeded helper --- __tests__/integration.test.ts | 16 +++++----------- __tests__/typescript-stress.test.ts | 7 +------ environment/util.ts | 12 ------------ index.ts | 1 - 4 files changed, 6 insertions(+), 30 deletions(-) delete mode 100644 environment/util.ts diff --git a/__tests__/integration.test.ts b/__tests__/integration.test.ts index 9c6bb8dd..d1430ad2 100644 --- a/__tests__/integration.test.ts +++ b/__tests__/integration.test.ts @@ -12,7 +12,6 @@ import { import { createServer } from '../router/server'; import { createClient } from '../router/client'; import { asClientRpc, asClientStream } from '../router/server.util'; -import { createTestEnvironment } from '../environment/util'; export const EchoRequest = Type.Object({ msg: Type.String(), @@ -100,20 +99,17 @@ describe('server-side test', () => { const initialState = { count: 0 }; test('rpc basic', async () => { - const env = createTestEnvironment(); - const add = asClientRpc(env, initialState, service.procedures.add); + const add = asClientRpc({}, initialState, service.procedures.add); await expect(add({ n: 3 })).resolves.toStrictEqual({ result: 3 }); }); test('rpc initial state', async () => { - const env = createTestEnvironment(); - const add = asClientRpc(env, { count: 5 }, service.procedures.add); + const add = asClientRpc({}, { count: 5 }, service.procedures.add); await expect(add({ n: 6 })).resolves.toStrictEqual({ result: 11 }); }); test('stream basic', async () => { - const env = createTestEnvironment(); - const [i, o] = asClientStream(env, initialState, service.procedures.echo); + const [i, o] = asClientStream({}, initialState, service.procedures.echo); i.push({ msg: 'abc', ignore: false }); i.push({ msg: 'def', ignore: true }); @@ -148,10 +144,9 @@ describe('client <-> server integration test', () => { }); test('rpc', async () => { - const env = createTestEnvironment(); const [ct, st] = await createWsTransports(port, wss); const serviceDefs = { test: TestServiceConstructor() }; - const server = await createServer(env, st, serviceDefs); + const server = await createServer({}, st, serviceDefs); const client = createClient(ct); await expect(client.test.add({ n: 3 })).resolves.toStrictEqual({ result: 3, @@ -159,10 +154,9 @@ describe('client <-> server integration test', () => { }); test('stream', async () => { - const env = createTestEnvironment(); const [ct, st] = await createWsTransports(port, wss); const serviceDefs = { test: TestServiceConstructor() }; - const server = await createServer(env, st, serviceDefs); + const server = await createServer({}, st, serviceDefs); const client = createClient(ct); const [i, o, close] = await client.test.echo(); diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index 3d2c25bb..c58a8a4b 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -6,7 +6,6 @@ import { createServer } from '../router/server'; import { Transport } from '../transport/types'; import { NaiveJsonCodec } from '../codec/json'; import { createClient } from '../router/client'; -import { createTestEnvironment } from '../environment/util'; const fnBody: Procedure<{}, 'rpc', TObject, TObject> = { type: 'rpc', @@ -98,11 +97,7 @@ describe("ensure typescript doesn't give up trying to infer the types for large c: svc(), d: svc(), }; - const server = await createServer( - createTestEnvironment(), - new MockTransport('SERVER'), - listing, - ); + const server = await createServer({}, new MockTransport('SERVER'), listing); const client = createClient(new MockTransport('client')); expect(server).toBeTruthy(); expect(client).toBeTruthy(); diff --git a/environment/util.ts b/environment/util.ts deleted file mode 100644 index 289cd712..00000000 --- a/environment/util.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { IsomorphicEnvironment } from './types'; - -export function createTestEnvironment(): IsomorphicEnvironment { - return { - log: (...args: unknown[]) => { - console.log(...args); - }, - error: (...args: unknown[]) => { - console.error(...args); - }, - }; -} diff --git a/index.ts b/index.ts index f40ff4d2..005dd36c 100644 --- a/index.ts +++ b/index.ts @@ -46,5 +46,4 @@ export { createWebSocketClient, } from './transport/util'; -export { createTestEnvironment } from './environment/util'; export type { IsomorphicEnvironment } from './environment/types'; From 91557b8debd515e436aabbce92d31615107c4a36 Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:17:16 -0600 Subject: [PATCH 5/9] typo --- environment/types.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment/types.ts b/environment/types.ts index 5e65f4b9..d12fb0c2 100644 --- a/environment/types.ts +++ b/environment/types.ts @@ -4,7 +4,7 @@ * * You should use declaration merging to extend this interface * with whatever you need. For example, if you need to access - * a database, you can do: + * a database, you could do: * ```ts * declare module '@replit/river' { * interface IsomorphicEnvironment { From 1c6c2bf9fa1389be04fc6ec64f82e8f579ca1f79 Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:23:29 -0600 Subject: [PATCH 6/9] use a context map --- router/server.ts | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/router/server.ts b/router/server.ts index 64f83d50..ee9242e9 100644 --- a/router/server.ts +++ b/router/server.ts @@ -28,14 +28,24 @@ export async function createServer>( transport: Transport, services: Services, ): Promise> { - // create streams for every stream procedure + const contextMap: Map> = new Map(); const streamMap: Map = new Map(); + + function getContext(service: AnyService) { + const context = contextMap.get(service); + + if (!context) { + throw new Error(`No context found for ${service.name}`); + } + + return context; + } + for (const [serviceName, service] of Object.entries(services)) { - const context: ProcedureContext = { - environment, - state: service.state, - }; + // populate the context map + contextMap.set(service, { environment, state: service.state }); + // create streams for every stream procedure for (const [procedureName, proc] of Object.entries(service.procedures)) { const procedure = proc as Procedure< object, @@ -51,7 +61,7 @@ export async function createServer>( outgoing, doneCtx: Promise.all([ // processing the actual procedure - procedure.handler(context, incoming, outgoing), + procedure.handler(getContext(service), incoming, outgoing), // sending outgoing messages back to client (async () => { for await (const response of outgoing) { @@ -89,12 +99,10 @@ export async function createServer>( procedure.type === 'rpc' && Value.Check(procedure.input, inputMessage.payload) ) { - // synchronous rpc - const context: ProcedureContext = { - environment, - state: service.state, - }; - const response = await procedure.handler(context, inputMessage); + const response = await procedure.handler( + getContext(service), + inputMessage, + ); transport.send(response); return; } else if ( From d847417d7d2a75380d0469b9de86dc4efb23065e Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Wed, 27 Sep 2023 10:48:09 -0600 Subject: [PATCH 7/9] use extended context concept instead --- __tests__/integration.test.ts | 10 +++++----- __tests__/typescript-stress.test.ts | 2 +- environment/types.ts | 16 ---------------- index.ts | 3 +-- router/builder.ts | 11 +++-------- router/context.ts | 28 ++++++++++++++++++++++++++++ router/server.ts | 18 ++++++++---------- router/server.util.ts | 10 +++++----- 8 files changed, 51 insertions(+), 47 deletions(-) delete mode 100644 environment/types.ts create mode 100644 router/context.ts diff --git a/__tests__/integration.test.ts b/__tests__/integration.test.ts index d1430ad2..065dcc2c 100644 --- a/__tests__/integration.test.ts +++ b/__tests__/integration.test.ts @@ -99,17 +99,17 @@ describe('server-side test', () => { const initialState = { count: 0 }; test('rpc basic', async () => { - const add = asClientRpc({}, initialState, service.procedures.add); + const add = asClientRpc(initialState, service.procedures.add); await expect(add({ n: 3 })).resolves.toStrictEqual({ result: 3 }); }); test('rpc initial state', async () => { - const add = asClientRpc({}, { count: 5 }, service.procedures.add); + const add = asClientRpc({ count: 5 }, service.procedures.add); await expect(add({ n: 6 })).resolves.toStrictEqual({ result: 11 }); }); test('stream basic', async () => { - const [i, o] = asClientStream({}, initialState, service.procedures.echo); + const [i, o] = asClientStream(initialState, service.procedures.echo); i.push({ msg: 'abc', ignore: false }); i.push({ msg: 'def', ignore: true }); @@ -146,7 +146,7 @@ describe('client <-> server integration test', () => { test('rpc', async () => { const [ct, st] = await createWsTransports(port, wss); const serviceDefs = { test: TestServiceConstructor() }; - const server = await createServer({}, st, serviceDefs); + const server = await createServer(st, serviceDefs); const client = createClient(ct); await expect(client.test.add({ n: 3 })).resolves.toStrictEqual({ result: 3, @@ -156,7 +156,7 @@ describe('client <-> server integration test', () => { test('stream', async () => { const [ct, st] = await createWsTransports(port, wss); const serviceDefs = { test: TestServiceConstructor() }; - const server = await createServer({}, st, serviceDefs); + const server = await createServer(st, serviceDefs); const client = createClient(ct); const [i, o, close] = await client.test.echo(); diff --git a/__tests__/typescript-stress.test.ts b/__tests__/typescript-stress.test.ts index c58a8a4b..7ae62469 100644 --- a/__tests__/typescript-stress.test.ts +++ b/__tests__/typescript-stress.test.ts @@ -97,7 +97,7 @@ describe("ensure typescript doesn't give up trying to infer the types for large c: svc(), d: svc(), }; - const server = await createServer({}, new MockTransport('SERVER'), listing); + const server = await createServer(new MockTransport('SERVER'), listing); const client = createClient(new MockTransport('client')); expect(server).toBeTruthy(); expect(client).toBeTruthy(); diff --git a/environment/types.ts b/environment/types.ts deleted file mode 100644 index d12fb0c2..00000000 --- a/environment/types.ts +++ /dev/null @@ -1,16 +0,0 @@ -/** - * The environment for services/procedures. This is used only on - * the server. - * - * You should use declaration merging to extend this interface - * with whatever you need. For example, if you need to access - * a database, you could do: - * ```ts - * declare module '@replit/river' { - * interface IsomorphicEnvironment { - * db: Database; - * } - * } - * ``` - */ -export interface IsomorphicEnvironment {} diff --git a/index.ts b/index.ts index 005dd36c..8f5563db 100644 --- a/index.ts +++ b/index.ts @@ -16,6 +16,7 @@ export type { ServerClient } from './router/client'; export { createServer } from './router/server'; export { asClientRpc, asClientStream } from './router/server.util'; export type { Server } from './router/server'; +export type { ServiceContext, ServiceContextWithState } from './router/context'; export { Transport } from './transport/types'; export { @@ -45,5 +46,3 @@ export { waitForSocketReady, createWebSocketClient, } from './transport/util'; - -export type { IsomorphicEnvironment } from './environment/types'; diff --git a/router/builder.ts b/router/builder.ts index f7c9fc05..3911eec5 100644 --- a/router/builder.ts +++ b/router/builder.ts @@ -1,7 +1,7 @@ import { TObject, Static, Type } from '@sinclair/typebox'; import type { Pushable } from 'it-pushable'; import { TransportMessage } from '../transport/message'; -import { IsomorphicEnvironment } from '../environment/types'; +import { ServiceContextWithState } from './context'; export type ValidProcType = 'stream' | 'rpc'; export type ProcListing = Record< @@ -59,11 +59,6 @@ export type ProcType< ProcName extends keyof S['procedures'], > = S['procedures'][ProcName]['type']; -export interface ProcedureContext { - environment: IsomorphicEnvironment; - state: State; -} - export type Procedure< State extends object | unknown, Ty extends ValidProcType, @@ -74,7 +69,7 @@ export type Procedure< input: I; output: O; handler: ( - context: ProcedureContext, + context: ServiceContextWithState, input: TransportMessage>, ) => Promise>>; type: Ty; @@ -83,7 +78,7 @@ export type Procedure< input: I; output: O; handler: ( - context: ProcedureContext, + context: ServiceContextWithState, input: AsyncIterable>>, output: Pushable>>, ) => Promise; diff --git a/router/context.ts b/router/context.ts new file mode 100644 index 00000000..281a9733 --- /dev/null +++ b/router/context.ts @@ -0,0 +1,28 @@ +/** + * The context for services/procedures. This is used only on + * the server. + * + * An important detail is that the state prop is always on + * this interface and it shouldn't be changed, removed, or + * extended. This prop is for the state of a service. + * + * You should use declaration merging to extend this interface + * with whatever you need. For example, if you need to access + * a database, you could do: + * ```ts + * declare module '@replit/river' { + * interface ServiceContext { + * db: Database; + * } + * } + * ``` + */ +export interface ServiceContext { + state: object | unknown; +} + +/** + * The {@link ServiceContext} with state. This is what is passed to procedures. + */ +export type ServiceContextWithState = + ServiceContext & { state: State }; diff --git a/router/server.ts b/router/server.ts index ee9242e9..53827aa9 100644 --- a/router/server.ts +++ b/router/server.ts @@ -1,16 +1,11 @@ import { TObject } from '@sinclair/typebox'; import { Transport } from '../transport/types'; -import { - AnyService, - Procedure, - ProcedureContext, - ValidProcType, -} from './builder'; +import { AnyService, Procedure, ValidProcType } from './builder'; import { Value } from '@sinclair/typebox/value'; import { pushable } from 'it-pushable'; import type { Pushable } from 'it-pushable'; import { OpaqueTransportMessage, TransportMessage } from '../transport/message'; -import { IsomorphicEnvironment } from '../environment/types'; +import { ServiceContext, ServiceContextWithState } from './context'; export interface Server { services: Services; @@ -24,11 +19,14 @@ interface ProcStream { } export async function createServer>( - environment: IsomorphicEnvironment, transport: Transport, services: Services, + extendedContext?: ServiceContext, ): Promise> { - const contextMap: Map> = new Map(); + const contextMap: Map< + AnyService, + ServiceContextWithState + > = new Map(); const streamMap: Map = new Map(); function getContext(service: AnyService) { @@ -43,7 +41,7 @@ export async function createServer>( for (const [serviceName, service] of Object.entries(services)) { // populate the context map - contextMap.set(service, { environment, state: service.state }); + contextMap.set(service, { ...extendedContext, state: service.state }); // create streams for every stream procedure for (const [procedureName, proc] of Object.entries(service.procedures)) { diff --git a/router/server.util.ts b/router/server.util.ts index b114a182..95f1ab39 100644 --- a/router/server.util.ts +++ b/router/server.util.ts @@ -6,20 +6,20 @@ import { } from '../transport/message'; import { pushable } from 'it-pushable'; import type { Pushable } from 'it-pushable'; -import { IsomorphicEnvironment } from '../environment/types'; +import { ServiceContext } from './context'; export function asClientRpc< State extends object | unknown, I extends TObject, O extends TObject, >( - environment: IsomorphicEnvironment, state: State, proc: Procedure, + extendedContext?: ServiceContext, ) { return (msg: Static) => proc - .handler({ environment, state }, payloadToTransportMessage(msg)) + .handler({ ...extendedContext, state }, payloadToTransportMessage(msg)) .then((res) => res.payload); } @@ -28,9 +28,9 @@ export function asClientStream< I extends TObject, O extends TObject, >( - environment: IsomorphicEnvironment, state: State, proc: Procedure, + extendedContext?: ServiceContext, ): [Pushable>, Pushable>] { const i = pushable>({ objectMode: true }); const o = pushable>({ objectMode: true }); @@ -55,7 +55,7 @@ export function asClientStream< // handle (async () => { - await proc.handler({ environment, state }, ri, ro); + await proc.handler({ ...extendedContext, state }, ri, ro); ro.end(); })(); From 50ac678f9c0eb6588e588fbb11685dcf99d1e0e6 Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Wed, 27 Sep 2023 10:50:10 -0600 Subject: [PATCH 8/9] whoops omit state from being required --- router/server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/server.ts b/router/server.ts index 53827aa9..85512649 100644 --- a/router/server.ts +++ b/router/server.ts @@ -21,7 +21,7 @@ interface ProcStream { export async function createServer>( transport: Transport, services: Services, - extendedContext?: ServiceContext, + extendedContext?: Omit, ): Promise> { const contextMap: Map< AnyService, From 5770520a2541f60f6f38e7b6e1bb3aae8c9518c4 Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Wed, 27 Sep 2023 10:52:45 -0600 Subject: [PATCH 9/9] whoops again --- router/server.util.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/router/server.util.ts b/router/server.util.ts index 95f1ab39..3bcfdc89 100644 --- a/router/server.util.ts +++ b/router/server.util.ts @@ -15,7 +15,7 @@ export function asClientRpc< >( state: State, proc: Procedure, - extendedContext?: ServiceContext, + extendedContext?: Omit, ) { return (msg: Static) => proc @@ -30,7 +30,7 @@ export function asClientStream< >( state: State, proc: Procedure, - extendedContext?: ServiceContext, + extendedContext?: Omit, ): [Pushable>, Pushable>] { const i = pushable>({ objectMode: true }); const o = pushable>({ objectMode: true });