Skip to content

Commit

Permalink
feat: Complete all RPC kinds
Browse files Browse the repository at this point in the history
We have three of the four gRPC RPC kinds (rpc, stream, subscription).

This change adds the final RPC kind: upload (client-stream, single
message response). It also adds the `*WithInitialization` RPCs, where we
can send a message of another schema before the main stream, which is a
very common pattern used in filesystem (i.e. open file for writing,
where the first message is the path and the other messages are the
chunks) and collaboration (i.e. first message is the channel name and
the rest of the bidirectional messages are the actual stream).
  • Loading branch information
lhchavez committed Dec 11, 2023
1 parent bf068db commit b46933a
Show file tree
Hide file tree
Showing 7 changed files with 731 additions and 56 deletions.
85 changes: 85 additions & 0 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
OrderingServiceConstructor,
STREAM_ERROR,
SubscribableServiceConstructor,
UploadableServiceConstructor,
TestServiceConstructor,
} from './fixtures/services';
import { UNCAUGHT_ERROR } from '../router/result';
Expand Down Expand Up @@ -135,6 +136,32 @@ describe.each(codecs)(
});
});

test('streamWithInitialization', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [input, output, close] =
await client.test.echoWithPrefix.streamWithInitialization({

Check failure on line 146 in __tests__/e2e.test.ts

View workflow job for this annotation

GitHub Actions / build-and-test (macos-latest)

Argument of type '{ prefix: string; }' is not assignable to parameter of type 'TObject<{ prefix: TString; }>'.

Check failure on line 146 in __tests__/e2e.test.ts

View workflow job for this annotation

GitHub Actions / build-and-test (ubuntu-latest)

Argument of type '{ prefix: string; }' is not assignable to parameter of type 'TObject<{ prefix: TString; }>'.
prefix: 'test',
});
input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();

const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'test abc' });

const result2 = await iterNext(output);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'test ghi' });

close();
});

test('fallible stream', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: FallibleServiceConstructor() };
Expand Down Expand Up @@ -232,6 +259,64 @@ describe.each(codecs)(
});
});

test('upload', async () => {
const options = { codec };
const serverTransport = new WebSocketServerTransport(
webSocketServer,
'SERVER',
options,
);
const clientTransport = new WebSocketClientTransport(
() => createLocalWebSocketClient(port),
'client',
'SERVER',
options,
);

const serviceDefs = { uploadable: UploadableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [addStream, addResult] =
await client.uploadable.addMultiple.upload();
addStream.push({ n: 1 });
addStream.push({ n: 2 });
addStream.end();
const result = await addResult;
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
});

test('uploadWithInitialization', async () => {
const options = { codec };
const serverTransport = new WebSocketServerTransport(
webSocketServer,
'SERVER',
options,
);
const clientTransport = new WebSocketClientTransport(
() => createLocalWebSocketClient(port),
'client',
'SERVER',
options,
);

const serviceDefs = { uploadable: UploadableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

const [addStream, addResult] =
await client.uploadable.addMultipleWithPrefix.uploadWithInitialization({
prefix: 'test',
});
addStream.push({ n: 1 });
addStream.push({ n: 2 });
addStream.end();
const result = await addResult;
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 'test 3' });
});

test('message order is preserved in the face of disconnects', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: OrderingServiceConstructor() };
Expand Down
58 changes: 58 additions & 0 deletions __tests__/fixtures/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ export const TestServiceConstructor = () =>
}
},
})
.defineProcedure('echoWithPrefix', {
type: 'streamWithInitialization',
init: Type.Object({ prefix: Type.String() }),
input: EchoRequest,
output: EchoResponse,
errors: Type.Never(),
async handler(_ctx, init, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (!req.ignore) {
returnStream.push(
reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })),
);
}
}
},
})
.finalize();

export const OrderingServiceConstructor = () =>
Expand Down Expand Up @@ -187,3 +204,44 @@ export const SubscribableServiceConstructor = () =>
},
})
.finalize();

export const UploadableServiceConstructor = () =>
ServiceBuilder.create('uploadable')
.initialState({})
.defineProcedure('addMultiple', {
type: 'upload',
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(_ctx, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
result += n;
lastMsg = msg;
}
return reply(lastMsg!, Ok({ result: result }));
},
})
.defineProcedure('addMultipleWithPrefix', {
type: 'uploadWithInitialization',
init: Type.Object({ prefix: Type.String() }),
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.String() }),
errors: Type.Never(),
async handler(_ctx, init, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
result += n;
lastMsg = msg;
}
return reply(
lastMsg!,
Ok({ result: init.payload.prefix + ' ' + result }),
);
},
})
.finalize();
28 changes: 17 additions & 11 deletions __tests__/typescript-stress.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, expect, test } from 'vitest';
import { Procedure, ServiceBuilder, serializeService } from '../router/builder';
import { Type } from '@sinclair/typebox';
import { Type, TNull, Static } from '@sinclair/typebox';
import { MessageId, OpaqueTransportMessage, reply } from '../transport/message';
import { createServer } from '../router/server';
import { Connection, Transport } from '../transport/transport';
Expand All @@ -20,16 +20,22 @@ const errors = Type.Union([
message: Type.String(),
}),
]);
const fnBody: Procedure<{}, 'rpc', typeof input, typeof output, typeof errors> =
{
type: 'rpc',
input,
output,
errors,
async handler(_state, msg) {
return reply(msg, Ok({ b: msg.payload.a }));
},
};
const fnBody: Procedure<
{},
'rpc',
Static<TNull>,
typeof input,
typeof output,
typeof errors
> = {
type: 'rpc',
input,
output,
errors,
async handler(_state, msg) {
return reply(msg, Ok({ b: msg.payload.a }));
},
};

// typescript is limited to max 50 constraints
// see: https://github.com/microsoft/TypeScript/issues/33541
Expand Down
Loading

0 comments on commit b46933a

Please sign in to comment.