Skip to content

Commit

Permalink
refactor so handler signatures have been changed from Transport<T>
Browse files Browse the repository at this point in the history
…-> `T` (#40)
  • Loading branch information
jackyzha0 authored Dec 15, 2023
1 parent 93a6d47 commit e27e73b
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 506 deletions.
13 changes: 6 additions & 7 deletions __tests__/disconnects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ describe('procedures should handle unexpected disconnects', async () => {

// start procedure
await client.test.add.rpc({ n: 3 });

expect(clientTransport.connections.size).toEqual(1);
expect(serverTransport.connections.size).toEqual(1);

Expand All @@ -76,8 +75,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});

Expand Down Expand Up @@ -112,8 +111,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});

Expand Down Expand Up @@ -238,8 +237,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});
});
102 changes: 39 additions & 63 deletions __tests__/fixtures/services.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Type } from '@sinclair/typebox';
import { ServiceBuilder } from '../../router/builder';
import { reply } from '../../transport/message';
import { Err, Ok } from '../../router/result';
import { Observable } from './observable';

Expand All @@ -21,10 +20,9 @@ export const TestServiceConstructor = () =>
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
async handler(ctx, { n }) {
ctx.state.count += n;
return reply(msg, Ok({ result: ctx.state.count }));
return Ok({ result: ctx.state.count });
},
})
.defineProcedure('echo', {
Expand All @@ -33,13 +31,12 @@ export const TestServiceConstructor = () =>
output: EchoResponse,
errors: Type.Never(),
async handler(_ctx, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (!req.ignore) {
returnStream.push(reply(msg, Ok({ response: req.msg })));
for await (const { ignore, msg, end } of msgStream) {
if (!ignore) {
returnStream.push(Ok({ response: msg }));
}

if (req.end) {
if (end) {
returnStream.end();
}
}
Expand All @@ -52,12 +49,9 @@ export const TestServiceConstructor = () =>
output: EchoResponse,
errors: Type.Never(),
async handler(_ctx, init, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (!req.ignore) {
returnStream.push(
reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })),
);
for await (const { ignore, msg } of msgStream) {
if (!ignore) {
returnStream.push(Ok({ response: `${init.prefix} ${msg}` }));
}
}
},
Expand All @@ -74,19 +68,18 @@ export const OrderingServiceConstructor = () =>
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ n: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
async handler(ctx, { n }) {
ctx.state.msgs.push(n);
return reply(msg, Ok({ n }));
return Ok({ n });
},
})
.defineProcedure('getAll', {
type: 'rpc',
input: Type.Object({}),
output: Type.Object({ msgs: Type.Array(Type.Number()) }),
errors: Type.Never(),
async handler(ctx, msg) {
return reply(msg, Ok({ msgs: ctx.state.msgs }));
async handler(ctx, _msg) {
return Ok({ msgs: ctx.state.msgs });
},
})
.finalize();
Expand All @@ -98,11 +91,11 @@ export const BinaryFileServiceConstructor = () =>
input: Type.Object({ file: Type.String() }),
output: Type.Object({ contents: Type.Uint8Array() }),
errors: Type.Never(),
async handler(_ctx, msg) {
async handler(_ctx, { file }) {
const bytes: Uint8Array = new TextEncoder().encode(
`contents for file ${msg.payload.file}`,
`contents for file ${file}`,
);
return reply(msg, Ok({ contents: bytes }));
return Ok({ contents: bytes });
},
})
.finalize();
Expand All @@ -123,19 +116,15 @@ export const FallibleServiceConstructor = () =>
extras: Type.Object({ test: Type.String() }),
}),
]),
async handler(_ctx, msg) {
const { a, b } = msg.payload;
async handler(_ctx, { a, b }) {
if (b === 0) {
return reply(msg, {
ok: false,
payload: {
code: DIV_BY_ZERO,
message: 'Cannot divide by zero',
extras: { test: 'abc' },
},
return Err({
code: DIV_BY_ZERO,
message: 'Cannot divide by zero',
extras: { test: 'abc' },
});
} else {
return reply(msg, Ok({ result: a / b }));
return Ok({ result: a / b });
}
},
})
Expand All @@ -154,22 +143,18 @@ export const FallibleServiceConstructor = () =>
}),
]),
async handler(_ctx, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (req.throwError) {
for await (const { msg, throwError, throwResult } of msgStream) {
if (throwError) {
throw new Error('some message');
} else if (req.throwResult) {
} else if (throwResult) {
returnStream.push(
reply(
msg,
Err({
code: STREAM_ERROR,
message: 'field throwResult was set to true',
}),
),
Err({
code: STREAM_ERROR,
message: 'field throwResult was set to true',
}),
);
} else {
returnStream.push(reply(msg, Ok({ response: req.msg })));
returnStream.push(Ok({ response: msg }));
}
}
},
Expand All @@ -186,20 +171,19 @@ export const SubscribableServiceConstructor = () =>
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
async handler(ctx, { n }) {
ctx.state.count.set((prev) => prev + n);
return reply(msg, Ok({ result: ctx.state.count.get() }));
return Ok({ result: ctx.state.count.get() });
},
})
.defineProcedure('value', {
type: 'subscription',
input: Type.Object({}),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg, returnStream) {
async handler(ctx, _msg, returnStream) {
ctx.state.count.observe((count) => {
returnStream.push(reply(msg, Ok({ result: count })));
returnStream.push(Ok({ result: count }));
});
},
})
Expand All @@ -215,13 +199,11 @@ export const UploadableServiceConstructor = () =>
errors: Type.Never(),
async handler(_ctx, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
for await (const { n } of msgStream) {
result += n;
lastMsg = msg;
}
return reply(lastMsg!, Ok({ result: result }));

return Ok({ result: result });
},
})
.defineProcedure('addMultipleWithPrefix', {
Expand All @@ -232,16 +214,10 @@ export const UploadableServiceConstructor = () =>
errors: Type.Never(),
async handler(_ctx, init, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
for await (const { n } of msgStream) {
result += n;
lastMsg = msg;
}
return reply(
lastMsg!,
Ok({ result: init.payload.prefix + ' ' + result }),
);
return Ok({ result: init.prefix + ' ' + result });
},
})
.finalize();
18 changes: 9 additions & 9 deletions __tests__/handler.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import {
asClientRpc,
asClientStream,
asClientStreamWithInitialization,
asClientSubscription,
asClientUpload,
asClientUploadWithInitialization,
iterNext,
} from '../util/testHelpers';
import { assert, describe, expect, test } from 'vitest';
Expand All @@ -21,10 +19,9 @@ import { Observable } from './fixtures/observable';

describe('server-side test', () => {
const service = TestServiceConstructor();
const initialState = { count: 0 };

test('rpc basic', async () => {
const add = asClientRpc(initialState, service.procedures.add);
const add = asClientRpc({ count: 0 }, service.procedures.add);
const result = await add({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
Expand Down Expand Up @@ -57,7 +54,7 @@ describe('server-side test', () => {

test('stream basic', async () => {
const [input, output] = asClientStream(
initialState,
{ count: 0 },
service.procedures.echo,
);

Expand All @@ -78,8 +75,8 @@ describe('server-side test', () => {
});

test('stream with initialization', async () => {
const [input, output] = asClientStreamWithInitialization(
initialState,
const [input, output] = asClientStream(
{ count: 0 },
service.procedures.echoWithPrefix,
{ prefix: 'test' },
);
Expand Down Expand Up @@ -148,7 +145,10 @@ describe('server-side test', () => {

test('uploads', async () => {
const service = UploadableServiceConstructor();
const [input, result] = asClientUpload({}, service.procedures.addMultiple);
const [input, result] = await asClientUpload(
{},
service.procedures.addMultiple,
);

input.push({ n: 1 });
input.push({ n: 2 });
Expand All @@ -158,7 +158,7 @@ describe('server-side test', () => {

test('uploads with initialization', async () => {
const service = UploadableServiceConstructor();
const [input, result] = asClientUploadWithInitialization(
const [input, result] = await asClientUpload(
{},
service.procedures.addMultipleWithPrefix,
{ prefix: 'test' },
Expand Down
8 changes: 4 additions & 4 deletions __tests__/typescript-stress.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, expect, test } from 'vitest';
import { Procedure, ServiceBuilder, serializeService } from '../router/builder';
import { Type } from '@sinclair/typebox';
import { MessageId, OpaqueTransportMessage, reply } from '../transport/message';
import { MessageId, OpaqueTransportMessage } from '../transport/message';
import { createServer } from '../router/server';
import { Connection, Transport } from '../transport/transport';
import { NaiveJsonCodec } from '../codec/json';
Expand Down Expand Up @@ -32,10 +32,10 @@ const fnBody: Procedure<{}, 'rpc', typeof input, typeof output, typeof errors> =
output,
errors,
async handler(_state, msg) {
if ('c' in msg.payload) {
return reply(msg, Ok({ b: msg.payload.c }));
if ('c' in msg) {
return Ok({ b: msg.c });
} else {
return reply(msg, Ok({ b: msg.payload.a }));
return Ok({ b: msg.a });
}
},
};
Expand Down
Loading

0 comments on commit e27e73b

Please sign in to comment.