Skip to content

Commit

Permalink
Server uses ReadStream and WriteStream #136 (#137)
Browse files Browse the repository at this point in the history
* Server uses streams

* Bruh
  • Loading branch information
masad-frost committed Jun 6, 2024
1 parent 2d88c4f commit 87b15ec
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 164 deletions.
12 changes: 6 additions & 6 deletions __tests__/fixtures/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ const testServiceProcedures = TestServiceScaffold.procedures({
async handler(_ctx, msgStream, returnStream) {
for await (const { ignore, msg, end } of msgStream) {
if (!ignore) {
returnStream.push(Ok({ response: msg }));
returnStream.write(Ok({ response: msg }));
}

if (end) {
returnStream.end();
returnStream.close();
}
}
},
Expand All @@ -67,7 +67,7 @@ const testServiceProcedures = TestServiceScaffold.procedures({
async handler(_ctx, init, msgStream, returnStream) {
for await (const { ignore, msg } of msgStream) {
if (!ignore) {
returnStream.push(Ok({ response: `${init.prefix} ${msg}` }));
returnStream.write(Ok({ response: `${init.prefix} ${msg}` }));
}
}
},
Expand Down Expand Up @@ -181,14 +181,14 @@ export const FallibleServiceSchema = ServiceSchema.define({
if (throwError) {
throw new Error('some message');
} else if (throwResult) {
returnStream.push(
returnStream.write(
Err({
code: STREAM_ERROR,
message: 'field throwResult was set to true',
}),
);
} else {
returnStream.push(Ok({ response: msg }));
returnStream.write(Ok({ response: msg }));
}
}
},
Expand All @@ -212,7 +212,7 @@ export const SubscribableServiceSchema = ServiceSchema.define(
output: Type.Object({ result: Type.Number() }),
async handler(ctx, _msg, returnStream) {
return ctx.state.count.observe((count) => {
returnStream.push(Ok({ result: count }));
returnStream.write(Ok({ result: count }));
});
},
}),
Expand Down
85 changes: 46 additions & 39 deletions __tests__/handler.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
/* eslint-disable */
// @ts-nocheck
// will add back when we do server stuff
import {
asClientRpc,
asClientStream,
asClientSubscription,
asClientUpload,
getIteratorFromStream,
iterNext,
} from '../util/testHelpers';
import { assert, describe, expect, test } from 'vitest';
Expand Down Expand Up @@ -56,74 +54,79 @@ describe.skip('server-side test', () => {
});

test('stream basic', async () => {
const [input, output] = asClientStream(
const [inputWriter, outputReader] = asClientStream(
{ count: 0 },
service.procedures.echo,
);

input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();
inputWriter.write({ msg: 'abc', ignore: false });
inputWriter.write({ msg: 'def', ignore: true });
inputWriter.write({ msg: 'ghi', ignore: false });
inputWriter.close();

const result1 = await iterNext(output);
const outputIterator = getIteratorFromStream(outputReader);
const result1 = await iterNext(outputIterator);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });

const result2 = await iterNext(output);
const result2 = await iterNext(outputIterator);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'ghi' });

expect(output.readableLength).toBe(0);
expect(outputIterator.next()).toEqual({ done: true, value: undefined });
});

test('stream with initialization', async () => {
const [input, output] = asClientStream(
const [inputWriter, outputReader] = asClientStream(
{ count: 0 },
service.procedures.echoWithPrefix,
{ prefix: 'test' },
);

input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();
inputWriter.write({ msg: 'abc', ignore: false });
inputWriter.write({ msg: 'def', ignore: true });
inputWriter.write({ msg: 'ghi', ignore: false });
inputWriter.close();

const result1 = await iterNext(output);
const outputIterator = getIteratorFromStream(outputReader);
const result1 = await iterNext(outputIterator);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'test abc' });

const result2 = await iterNext(output);
const result2 = await iterNext(outputIterator);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'test ghi' });

expect(output.readableLength).toBe(0);
expect(outputIterator.next()).toEqual({ done: true, value: undefined });
});

test('fallible stream', async () => {
const service = FallibleServiceSchema.instantiate();
const [input, output] = asClientStream({}, service.procedures.echo);
const [inputWriter, outputReader] = asClientStream(
{},
service.procedures.echo,
);

input.push({ msg: 'abc', throwResult: false, throwError: false });
const result1 = await iterNext(output);
inputWriter.write({ msg: 'abc', throwResult: false, throwError: false });
const outputIterator = getIteratorFromStream(outputReader);
const result1 = await iterNext(outputIterator);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });

input.push({ msg: 'def', throwResult: true, throwError: false });
const result2 = await iterNext(output);
inputWriter.write({ msg: 'def', throwResult: true, throwError: false });
const result2 = await iterNext(outputIterator);
assert(!result2.ok);
expect(result2.payload.code).toStrictEqual(STREAM_ERROR);

input.push({ msg: 'ghi', throwResult: false, throwError: true });
const result3 = await iterNext(output);
inputWriter.write({ msg: 'ghi', throwResult: false, throwError: true });
const result3 = await iterNext(outputIterator);
assert(!result3.ok);
expect(result3.payload).toStrictEqual({
code: UNCAUGHT_ERROR,
message: 'some message',
});

input.end();
expect(output.readableLength).toBe(0);
inputWriter.close();
});

test('subscriptions', async () => {
Expand All @@ -132,41 +135,45 @@ describe.skip('server-side test', () => {
const add = asClientRpc(state, service.procedures.add);
const subscribe = asClientSubscription(state, service.procedures.value);

const stream = subscribe({});
const streamResult1 = await iterNext(stream);
const outputReader = subscribe({});
const outputIterator = getIteratorFromStream(outputReader);
const streamResult1 = await iterNext(outputIterator);
assert(streamResult1.ok);
expect(streamResult1.payload).toStrictEqual({ result: 0 });

const result = await add({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });

const streamResult2 = await iterNext(stream);
const streamResult2 = await iterNext(outputIterator);
assert(streamResult1.ok);
expect(streamResult2.payload).toStrictEqual({ result: 3 });
});

test('uploads', async () => {
const service = UploadableServiceSchema.instantiate();
const [input, result] = asClientUpload({}, service.procedures.addMultiple);
const [inputWriter, result] = asClientUpload(
{},
service.procedures.addMultiple,
);

input.push({ n: 1 });
input.push({ n: 2 });
input.end();
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
inputWriter.close();
expect(await result).toStrictEqual({ ok: true, payload: { result: 3 } });
});

test('uploads with initialization', async () => {
const service = UploadableServiceSchema.instantiate();
const [input, result] = asClientUpload(
const [inputWriter, result] = asClientUpload(
{},
service.procedures.addMultipleWithPrefix,
{ prefix: 'test' },
);

input.push({ n: 1 });
input.push({ n: 2 });
input.end();
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
inputWriter.close();
expect(await result).toStrictEqual({
ok: true,
payload: { result: 'test 3' },
Expand Down
16 changes: 8 additions & 8 deletions router/procedures.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Static, TNever, Type, TSchema } from '@sinclair/typebox';
import type { Pushable } from 'it-pushable';
import { ServiceContextWithTransportInfo } from './context';
import { Result, RiverError, RiverUncaughtSchema } from './result';
import { ReadStream, WriteStream } from './streams';

/**
* Brands a type to prevent it from being directly constructed.
Expand Down Expand Up @@ -94,7 +94,7 @@ export type UploadProcedure<
handler(
context: ServiceContextWithTransportInfo<State>,
init: Static<Init>,
input: AsyncIterableIterator<Static<I>>,
input: ReadStream<Static<I>>,
): Promise<ProcedureResult<O, E>>;
}
: {
Expand All @@ -105,7 +105,7 @@ export type UploadProcedure<
description?: string;
handler(
context: ServiceContextWithTransportInfo<State>,
input: AsyncIterableIterator<Static<I>>,
input: ReadStream<Static<I>>,
): Promise<ProcedureResult<O, E>>;
};

Expand All @@ -131,7 +131,7 @@ export interface SubscriptionProcedure<
handler(
context: ServiceContextWithTransportInfo<State>,
input: Static<I>,
output: Pushable<ProcedureResult<O, E>>,
output: WriteStream<ProcedureResult<O, E>>,
): Promise<(() => void) | void>;
}

Expand Down Expand Up @@ -162,8 +162,8 @@ export type StreamProcedure<
handler(
context: ServiceContextWithTransportInfo<State>,
init: Static<Init>,
input: AsyncIterableIterator<Static<I>>,
output: Pushable<ProcedureResult<O, E>>,
input: ReadStream<Static<I>>,
output: WriteStream<ProcedureResult<O, E>>,
): Promise<(() => void) | void>;
}
: {
Expand All @@ -174,8 +174,8 @@ export type StreamProcedure<
description?: string;
handler(
context: ServiceContextWithTransportInfo<State>,
input: AsyncIterableIterator<Static<I>>,
output: Pushable<ProcedureResult<O, E>>,
input: ReadStream<Static<I>>,
output: WriteStream<ProcedureResult<O, E>>,
): Promise<(() => void) | void>;
};

Expand Down
Loading

0 comments on commit 87b15ec

Please sign in to comment.