Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor so handler signatures have been changed from Transport<T> -> T #40

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions __tests__/disconnects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ describe('procedures should handle unexpected disconnects', async () => {

// start procedure
await client.test.add.rpc({ n: 3 });

expect(clientTransport.connections.size).toEqual(1);
expect(serverTransport.connections.size).toEqual(1);

Expand All @@ -76,8 +75,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});

Expand Down Expand Up @@ -112,8 +111,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});

Expand Down Expand Up @@ -238,8 +237,8 @@ describe('procedures should handle unexpected disconnects', async () => {
}),
);

expect(clientTransport.connections.size).toEqual(0);
expect(serverTransport.connections.size).toEqual(0);
waitFor(() => expect(clientTransport.connections.size).toEqual(0));
waitFor(() => expect(serverTransport.connections.size).toEqual(0));
await ensureServerIsClean(server);
});
});
102 changes: 39 additions & 63 deletions __tests__/fixtures/services.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Type } from '@sinclair/typebox';
import { ServiceBuilder } from '../../router/builder';
import { reply } from '../../transport/message';
import { Err, Ok } from '../../router/result';
import { Observable } from './observable';

Expand All @@ -21,10 +20,9 @@ export const TestServiceConstructor = () =>
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
async handler(ctx, { n }) {
ctx.state.count += n;
return reply(msg, Ok({ result: ctx.state.count }));
return Ok({ result: ctx.state.count });
},
})
.defineProcedure('echo', {
Expand All @@ -33,13 +31,12 @@ export const TestServiceConstructor = () =>
output: EchoResponse,
errors: Type.Never(),
async handler(_ctx, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (!req.ignore) {
returnStream.push(reply(msg, Ok({ response: req.msg })));
for await (const { ignore, msg, end } of msgStream) {
if (!ignore) {
returnStream.push(Ok({ response: msg }));
}

if (req.end) {
if (end) {
returnStream.end();
}
}
Expand All @@ -52,12 +49,9 @@ export const TestServiceConstructor = () =>
output: EchoResponse,
errors: Type.Never(),
async handler(_ctx, init, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (!req.ignore) {
returnStream.push(
reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })),
);
for await (const { ignore, msg } of msgStream) {
if (!ignore) {
returnStream.push(Ok({ response: `${init.prefix} ${msg}` }));
}
}
},
Expand All @@ -74,19 +68,18 @@ export const OrderingServiceConstructor = () =>
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ n: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
async handler(ctx, { n }) {
ctx.state.msgs.push(n);
return reply(msg, Ok({ n }));
return Ok({ n });
},
})
.defineProcedure('getAll', {
type: 'rpc',
input: Type.Object({}),
output: Type.Object({ msgs: Type.Array(Type.Number()) }),
errors: Type.Never(),
async handler(ctx, msg) {
return reply(msg, Ok({ msgs: ctx.state.msgs }));
async handler(ctx, _msg) {
return Ok({ msgs: ctx.state.msgs });
},
})
.finalize();
Expand All @@ -98,11 +91,11 @@ export const BinaryFileServiceConstructor = () =>
input: Type.Object({ file: Type.String() }),
output: Type.Object({ contents: Type.Uint8Array() }),
errors: Type.Never(),
async handler(_ctx, msg) {
async handler(_ctx, { file }) {
const bytes: Uint8Array = new TextEncoder().encode(
`contents for file ${msg.payload.file}`,
`contents for file ${file}`,
);
return reply(msg, Ok({ contents: bytes }));
return Ok({ contents: bytes });
},
})
.finalize();
Expand All @@ -123,19 +116,15 @@ export const FallibleServiceConstructor = () =>
extras: Type.Object({ test: Type.String() }),
}),
]),
async handler(_ctx, msg) {
const { a, b } = msg.payload;
async handler(_ctx, { a, b }) {
if (b === 0) {
return reply(msg, {
ok: false,
payload: {
code: DIV_BY_ZERO,
message: 'Cannot divide by zero',
extras: { test: 'abc' },
},
return Err({
code: DIV_BY_ZERO,
message: 'Cannot divide by zero',
extras: { test: 'abc' },
});
} else {
return reply(msg, Ok({ result: a / b }));
return Ok({ result: a / b });
}
},
})
Expand All @@ -154,22 +143,18 @@ export const FallibleServiceConstructor = () =>
}),
]),
async handler(_ctx, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (req.throwError) {
for await (const { msg, throwError, throwResult } of msgStream) {
if (throwError) {
throw new Error('some message');
} else if (req.throwResult) {
} else if (throwResult) {
returnStream.push(
reply(
msg,
Err({
code: STREAM_ERROR,
message: 'field throwResult was set to true',
}),
),
Err({
code: STREAM_ERROR,
message: 'field throwResult was set to true',
}),
);
} else {
returnStream.push(reply(msg, Ok({ response: req.msg })));
returnStream.push(Ok({ response: msg }));
}
}
},
Expand All @@ -186,20 +171,19 @@ export const SubscribableServiceConstructor = () =>
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg) {
const { n } = msg.payload;
async handler(ctx, { n }) {
ctx.state.count.set((prev) => prev + n);
return reply(msg, Ok({ result: ctx.state.count.get() }));
return Ok({ result: ctx.state.count.get() });
},
})
.defineProcedure('value', {
type: 'subscription',
input: Type.Object({}),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(ctx, msg, returnStream) {
async handler(ctx, _msg, returnStream) {
ctx.state.count.observe((count) => {
returnStream.push(reply(msg, Ok({ result: count })));
returnStream.push(Ok({ result: count }));
});
},
})
Expand All @@ -215,13 +199,11 @@ export const UploadableServiceConstructor = () =>
errors: Type.Never(),
async handler(_ctx, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
for await (const { n } of msgStream) {
result += n;
lastMsg = msg;
}
return reply(lastMsg!, Ok({ result: result }));

return Ok({ result: result });
},
})
.defineProcedure('addMultipleWithPrefix', {
Expand All @@ -232,16 +214,10 @@ export const UploadableServiceConstructor = () =>
errors: Type.Never(),
async handler(_ctx, init, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
for await (const { n } of msgStream) {
result += n;
lastMsg = msg;
}
return reply(
lastMsg!,
Ok({ result: init.payload.prefix + ' ' + result }),
);
return Ok({ result: init.prefix + ' ' + result });
},
})
.finalize();
18 changes: 9 additions & 9 deletions __tests__/handler.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import {
asClientRpc,
asClientStream,
asClientStreamWithInitialization,
asClientSubscription,
asClientUpload,
asClientUploadWithInitialization,
iterNext,
} from '../util/testHelpers';
import { assert, describe, expect, test } from 'vitest';
Expand All @@ -21,10 +19,9 @@ import { Observable } from './fixtures/observable';

describe('server-side test', () => {
const service = TestServiceConstructor();
const initialState = { count: 0 };

test('rpc basic', async () => {
const add = asClientRpc(initialState, service.procedures.add);
const add = asClientRpc({ count: 0 }, service.procedures.add);
const result = await add({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
Expand Down Expand Up @@ -57,7 +54,7 @@ describe('server-side test', () => {

test('stream basic', async () => {
const [input, output] = asClientStream(
initialState,
{ count: 0 },
service.procedures.echo,
);

Expand All @@ -78,8 +75,8 @@ describe('server-side test', () => {
});

test('stream with initialization', async () => {
const [input, output] = asClientStreamWithInitialization(
initialState,
const [input, output] = asClientStream(
{ count: 0 },
service.procedures.echoWithPrefix,
{ prefix: 'test' },
);
Expand Down Expand Up @@ -148,7 +145,10 @@ describe('server-side test', () => {

test('uploads', async () => {
const service = UploadableServiceConstructor();
const [input, result] = asClientUpload({}, service.procedures.addMultiple);
const [input, result] = await asClientUpload(
{},
service.procedures.addMultiple,
);

input.push({ n: 1 });
input.push({ n: 2 });
Expand All @@ -158,7 +158,7 @@ describe('server-side test', () => {

test('uploads with initialization', async () => {
const service = UploadableServiceConstructor();
const [input, result] = asClientUploadWithInitialization(
const [input, result] = await asClientUpload(
{},
service.procedures.addMultipleWithPrefix,
{ prefix: 'test' },
Expand Down
8 changes: 4 additions & 4 deletions __tests__/typescript-stress.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, expect, test } from 'vitest';
import { Procedure, ServiceBuilder, serializeService } from '../router/builder';
import { Type } from '@sinclair/typebox';
import { MessageId, OpaqueTransportMessage, reply } from '../transport/message';
import { MessageId, OpaqueTransportMessage } from '../transport/message';
import { createServer } from '../router/server';
import { Connection, Transport } from '../transport/transport';
import { NaiveJsonCodec } from '../codec/json';
Expand Down Expand Up @@ -32,10 +32,10 @@ const fnBody: Procedure<{}, 'rpc', typeof input, typeof output, typeof errors> =
output,
errors,
async handler(_state, msg) {
if ('c' in msg.payload) {
return reply(msg, Ok({ b: msg.payload.c }));
if ('c' in msg) {
return Ok({ b: msg.c });
} else {
return reply(msg, Ok({ b: msg.payload.a }));
return Ok({ b: msg.a });
}
},
};
Expand Down
Loading