Skip to content

Commit

Permalink
[protocolv2] Init always exists (#159)
Browse files Browse the repository at this point in the history
* I -> Input, O -> Output, E -> Err

* Update types

* Update implementation

* Update tests

* Misc changes

* fix checking init instead of input
  • Loading branch information
masad-frost committed May 31, 2024
1 parent 553cdd5 commit f20170d
Show file tree
Hide file tree
Showing 15 changed files with 613 additions and 725 deletions.
6 changes: 3 additions & 3 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ interface BaseError {
The `Result` type MUST conform to:

```ts
type Result<T, E extends BaseError> =
| { ok: true; payload: T }
| { ok: false; payload: E };
type Result<SuccessPayload, ErrorPayload extends BaseError> =
| { ok: true; payload: SuccessPayload }
| { ok: false; payload: ErrorPayload };
```

The messages in either direction must also contain additional information so that the receiving party knows where to route the message payload. This wrapper message is referred to as a `TransportMessage` and its payload can be a `Control`, a `Result`, an `Init`, an `Input`, or an `Output`. The schema for the transport message is as follows:
Expand Down
2 changes: 1 addition & 1 deletion __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('bandwidth', async () => {
{ time: BENCH_DURATION },
);

const [inputWriter, outputReader] = await client.test.echo.stream();
const [inputWriter, outputReader] = await client.test.echo.stream({});
bench(
`${name} -- stream`,
async () => {
Expand Down
9 changes: 5 additions & 4 deletions __tests__/cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ describe.each(testMatrix())(
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [inputWriter, outputReader, close] =
await client.test.echo.stream();
const [inputWriter, outputReader, close] = await client.test.echo.stream(
{},
);
inputWriter.write({ msg: '1', ignore: false, end: undefined });
inputWriter.write({ msg: '2', ignore: false, end: true });

Expand Down Expand Up @@ -318,7 +319,7 @@ describe.each(testMatrix())(

// start procedure
const [inputWriter, addResult] =
await client.uploadable.addMultiple.upload();
await client.uploadable.addMultiple.upload({});
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
inputWriter.close();
Expand Down Expand Up @@ -368,7 +369,7 @@ describe.each(testMatrix())(
});

// start a stream
const [inputWriter, outputReader] = await client.test.echo.stream();
const [inputWriter, outputReader] = await client.test.echo.stream({});
inputWriter.write({ msg: '1', ignore: false });

const outputIterator = getIteratorFromStream(outputReader);
Expand Down
4 changes: 2 additions & 2 deletions __tests__/disconnects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe.each(testMatrix())(
});

// start procedure
const [inputWriter, outputReader] = await client.test.echo.stream();
const [inputWriter, outputReader] = await client.test.echo.stream({});
const outputIterator = getIteratorFromStream(outputReader);

inputWriter.write({ msg: 'abc', ignore: false });
Expand Down Expand Up @@ -236,7 +236,7 @@ describe.each(testMatrix())(

// start procedure
const [inputWriter, addResult] =
await client.uploadable.addMultiple.upload();
await client.uploadable.addMultiple.upload({});
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
// end procedure
Expand Down
13 changes: 7 additions & 6 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ describe.each(testMatrix())(
});

// test
const [inputWriter, outputReader, close] =
await client.test.echo.stream();
const [inputWriter, outputReader, close] = await client.test.echo.stream(
{},
);
const outputIterator = getIteratorFromStream(outputReader);

inputWriter.write({ msg: 'abc', ignore: false });
Expand Down Expand Up @@ -241,7 +242,7 @@ describe.each(testMatrix())(

// test
const [inputWriter, outputReader, close] =
await client.fallible.echo.stream();
await client.fallible.echo.stream({});
const outputIterator = getIteratorFromStream(outputReader);
inputWriter.write({ msg: 'abc', throwResult: false, throwError: false });
const result1 = await iterNext(outputIterator);
Expand Down Expand Up @@ -332,7 +333,7 @@ describe.each(testMatrix())(

// test
const [inputWriter, addResult] =
await client.uploadable.addMultiple.upload();
await client.uploadable.addMultiple.upload({});
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
inputWriter.close();
Expand Down Expand Up @@ -478,7 +479,7 @@ describe.each(testMatrix())(
// test
const openStreams = [];
for (let i = 0; i < CONCURRENCY; i++) {
const streamHandle = await client.test.echo.stream();
const streamHandle = await client.test.echo.stream({});
const inputWriter = streamHandle[0];
inputWriter.write({ msg: `${i}-1`, ignore: false });
inputWriter.write({ msg: `${i}-2`, ignore: false });
Expand Down Expand Up @@ -690,7 +691,7 @@ describe.each(testMatrix())(
const services = {
test: ServiceSchema.define({
getData: Procedure.rpc({
input: Type.Object({}),
init: Type.Object({}),
output: Type.Object({
data: Type.String(),
extra: Type.Number(),
Expand Down
53 changes: 37 additions & 16 deletions __tests__/fixtures/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const TestServiceScaffold = ServiceSchema.scaffold({

const testServiceProcedures = TestServiceScaffold.procedures({
add: Procedure.rpc({
input: Type.Object({ n: Type.Number() }),
init: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
async handler(ctx, { n }) {
ctx.state.count += n;
Expand All @@ -26,7 +26,7 @@ const testServiceProcedures = TestServiceScaffold.procedures({
}),

array: Procedure.rpc({
input: Type.Object({ n: Type.Number() }),
init: Type.Object({ n: Type.Number() }),
output: Type.Array(Type.Number()),
async handler(ctx, { n }) {
ctx.state.count += n;
Expand All @@ -35,19 +35,21 @@ const testServiceProcedures = TestServiceScaffold.procedures({
}),

arrayStream: Procedure.stream({
init: Type.Object({}),
input: Type.Object({ n: Type.Number() }),
output: Type.Array(Type.Number()),
async handler(_, msgStream, returnStream) {
async handler(_, _init, msgStream, returnStream) {
for await (const msg of msgStream) {
returnStream.write(Ok([msg.n]));
}
},
}),

echo: Procedure.stream({
init: Type.Object({}),
input: EchoRequest,
output: EchoResponse,
async handler(_ctx, msgStream, returnStream) {
async handler(_ctx, _init, msgStream, returnStream) {
for await (const { ignore, msg, end } of msgStream) {
if (!ignore) {
returnStream.write(Ok({ response: msg }));
Expand Down Expand Up @@ -75,7 +77,7 @@ const testServiceProcedures = TestServiceScaffold.procedures({

echoUnion: Procedure.rpc({
description: 'Echos back whatever we sent',
input: Type.Union([
init: Type.Union([
Type.Object(
{ a: Type.Number({ description: 'A number' }) },
{ description: 'A' },
Expand All @@ -99,6 +101,23 @@ const testServiceProcedures = TestServiceScaffold.procedures({
return Ok(input);
},
}),

unimplementedUpload: Procedure.upload({
init: Type.Object({}),
input: Type.Object({}),
output: Type.Object({}),
async handler() {
throw new Error('Not implemented');
},
}),

unimplementedSubscription: Procedure.subscription({
init: Type.Object({}),
output: Type.Object({}),
async handler() {
throw new Error('Not implemented');
},
}),
});

export const TestServiceSchema = TestServiceScaffold.finalize({
Expand All @@ -109,7 +128,7 @@ export const OrderingServiceSchema = ServiceSchema.define(
{ initializeState: () => ({ msgs: [] as Array<number> }) },
{
add: Procedure.rpc({
input: Type.Object({ n: Type.Number() }),
init: Type.Object({ n: Type.Number() }),
output: Type.Object({ n: Type.Number() }),
async handler(ctx, { n }) {
ctx.state.msgs.push(n);
Expand All @@ -118,7 +137,7 @@ export const OrderingServiceSchema = ServiceSchema.define(
}),

getAll: Procedure.rpc({
input: Type.Object({}),
init: Type.Object({}),
output: Type.Object({ msgs: Type.Array(Type.Number()) }),
async handler(ctx, _msg) {
return Ok({ msgs: ctx.state.msgs });
Expand All @@ -129,7 +148,7 @@ export const OrderingServiceSchema = ServiceSchema.define(

export const BinaryFileServiceSchema = ServiceSchema.define({
getFile: Procedure.rpc({
input: Type.Object({ file: Type.String() }),
init: Type.Object({ file: Type.String() }),
output: Type.Object({ contents: Type.Uint8Array() }),
async handler(_ctx, { file }) {
const bytes: Uint8Array = Buffer.from(`contents for file ${file}`);
Expand All @@ -143,7 +162,7 @@ export const STREAM_ERROR = 'STREAM_ERROR';

export const FallibleServiceSchema = ServiceSchema.define({
divide: Procedure.rpc({
input: Type.Object({ a: Type.Number(), b: Type.Number() }),
init: Type.Object({ a: Type.Number(), b: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Union([
Type.Object({
Expand All @@ -166,6 +185,7 @@ export const FallibleServiceSchema = ServiceSchema.define({
}),

echo: Procedure.stream({
init: Type.Object({}),
input: Type.Object({
msg: Type.String(),
throwResult: Type.Boolean(),
Expand All @@ -176,7 +196,7 @@ export const FallibleServiceSchema = ServiceSchema.define({
code: Type.Literal(STREAM_ERROR),
message: Type.String(),
}),
async handler(_ctx, msgStream, returnStream) {
async handler(_ctx, _init, msgStream, returnStream) {
for await (const { msg, throwError, throwResult } of msgStream) {
if (throwError) {
throw new Error('some message');
Expand All @@ -199,7 +219,7 @@ export const SubscribableServiceSchema = ServiceSchema.define(
{ initializeState: () => ({ count: new Observable(0) }) },
{
add: Procedure.rpc({
input: Type.Object({ n: Type.Number() }),
init: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
async handler(ctx, { n }) {
ctx.state.count.set((prev) => prev + n);
Expand All @@ -208,7 +228,7 @@ export const SubscribableServiceSchema = ServiceSchema.define(
}),

value: Procedure.subscription({
input: Type.Object({}),
init: Type.Object({}),
output: Type.Object({ result: Type.Number() }),
async handler(ctx, _msg, returnStream) {
return ctx.state.count.observe((count) => {
Expand All @@ -221,9 +241,10 @@ export const SubscribableServiceSchema = ServiceSchema.define(

export const UploadableServiceSchema = ServiceSchema.define({
addMultiple: Procedure.upload({
init: Type.Object({}),
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
async handler(_ctx, msgStream) {
async handler(_ctx, _init, msgStream) {
let result = 0;
for await (const { n } of msgStream) {
result += n;
Expand Down Expand Up @@ -256,15 +277,15 @@ const RecursivePayload = Type.Recursive((This) =>

export const NonObjectSchemas = ServiceSchema.define({
add: Procedure.rpc({
input: Type.Number(),
init: Type.Number(),
output: Type.Number(),
async handler(_ctx, n) {
return Ok(n + 1);
},
}),

echoRecursive: Procedure.rpc({
input: RecursivePayload,
init: RecursivePayload,
output: RecursivePayload,
async handler(_ctx, msg) {
return Ok(msg);
Expand All @@ -277,7 +298,7 @@ export function SchemaWithDisposableState(dispose: () => void) {
{ initializeState: () => ({ [Symbol.dispose]: dispose }) },
{
add: Procedure.rpc({
input: Type.Number(),
init: Type.Number(),
output: Type.Number(),
async handler(_ctx, n) {
return Ok(n + 1);
Expand Down
15 changes: 12 additions & 3 deletions __tests__/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
import { UNCAUGHT_ERROR } from '../router/result';
import { Observable } from './fixtures/observable';

describe.skip('server-side test', () => {
describe('server-side test', () => {
const service = TestServiceSchema.instantiate();

test('rpc basic', async () => {
Expand Down Expand Up @@ -73,7 +73,12 @@ describe.skip('server-side test', () => {
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'ghi' });

expect(outputIterator.next()).toEqual({ done: true, value: undefined });
await outputReader.requestClose();

expect(await outputIterator.next()).toEqual({
done: true,
value: undefined,
});
});

test('stream with initialization', async () => {
Expand All @@ -96,8 +101,12 @@ describe.skip('server-side test', () => {
const result2 = await iterNext(outputIterator);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'test ghi' });
await outputReader.requestClose();

expect(outputIterator.next()).toEqual({ done: true, value: undefined });
expect(await outputIterator.next()).toEqual({
done: true,
value: undefined,
});
});

test('fallible stream', async () => {
Expand Down
Loading

0 comments on commit f20170d

Please sign in to comment.