Skip to content

Commit

Permalink
Client uses ReadStream and WriteStream (#136)
Browse files Browse the repository at this point in the history
* Client uses ReadStream

* Client uses WriteStream

* delete port config lol

* Disable everything for handler.test.ts

* no void 0
  • Loading branch information
masad-frost committed Jun 11, 2024
1 parent 2a5b17d commit a4d0d0a
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 165 deletions.
8 changes: 4 additions & 4 deletions __tests__/bandwidth.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ describe('bandwidth', async () => {
{ time: BENCH_DURATION },
);

const [input, output] = await client.test.echo.stream();
const [inputWriter, outputReader] = await client.test.echo.stream();
bench(
`${name} -- stream`,
async () => {
input.push({ msg: nanoid(), ignore: false });
const result = await output.next();
assert(result.value.ok);
inputWriter.write({ msg: nanoid(), ignore: false });
const result = await outputReader[Symbol.asyncIterator]().next();
assert(result.value?.ok);
},
{ time: BENCH_DURATION },
);
Expand Down
45 changes: 25 additions & 20 deletions __tests__/cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { afterAll, assert, describe, expect, test, vi } from 'vitest';
import { iterNext } from '../util/testHelpers';
import { getIteratorFromStream, iterNext } from '../util/testHelpers';
import {
SubscribableServiceSchema,
TestServiceSchema,
Expand Down Expand Up @@ -178,25 +178,27 @@ describe.each(testMatrix())(
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [input, output, close] = await client.test.echo.stream();
input.push({ msg: '1', ignore: false, end: undefined });
input.push({ msg: '2', ignore: false, end: true });
const [inputWriter, outputReader, close] =
await client.test.echo.stream();
inputWriter.write({ msg: '1', ignore: false, end: undefined });
inputWriter.write({ msg: '2', ignore: false, end: true });

const result1 = await iterNext(output);
const outputIterator = getIteratorFromStream(outputReader);
const result1 = await iterNext(outputIterator);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: '1' });

// ensure we only have one stream despite pushing multiple messages.
await waitFor(() => expect(server.streams.size).toEqual(1));
input.end();
inputWriter.close();
// ensure we no longer have any streams since the input was closed.
await waitFor(() => expect(server.streams.size).toEqual(0));

const result2 = await iterNext(output);
const result2 = await iterNext(outputIterator);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: '2' });

const result3 = await output.next();
const result3 = await outputIterator.next();
assert(result3.done);

close();
Expand Down Expand Up @@ -248,18 +250,20 @@ describe.each(testMatrix())(
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [subscription, close] = await client.subscribable.value.subscribe(
const [outputReader, close] = await client.subscribable.value.subscribe(
{},
);
let result = await iterNext(subscription);
const outputIterator = getIteratorFromStream(outputReader);
let result = await iterNext(outputIterator);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });
const add1 = await client.subscribable.add.rpc({ n: 1 });
assert(add1.ok);
result = await iterNext(subscription);
result = await iterNext(outputIterator);
assert(result.ok);

close();
server;
// end procedure

// number of message handlers shouldn't increase after subscription ends
Expand Down Expand Up @@ -309,11 +313,11 @@ describe.each(testMatrix())(
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [addStream, addResult] =
const [inputWriter, addResult] =
await client.uploadable.addMultiple.upload();
addStream.push({ n: 1 });
addStream.push({ n: 2 });
addStream.end();
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
inputWriter.close();

const result = await addResult;
assert(result.ok);
Expand Down Expand Up @@ -362,10 +366,11 @@ describe.each(testMatrix())(
});

// start a stream
const [input, output] = await client.test.echo.stream();
input.push({ msg: '1', ignore: false });
const [inputWriter, outputReader] = await client.test.echo.stream();
inputWriter.write({ msg: '1', ignore: false });

const result1 = await iterNext(output);
const outputIterator = getIteratorFromStream(outputReader);
const result1 = await iterNext(outputIterator);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: '1' });

Expand All @@ -384,8 +389,8 @@ describe.each(testMatrix())(
await waitFor(() => expect(serverTransport.connections.size).toEqual(1));

// push on the old stream and make sure its not sent
input.push({ msg: '2', ignore: false });
const result2 = await iterNext(output);
expect(() => inputWriter.write({ msg: '2', ignore: false })).toThrow();
const result2 = await iterNext(outputIterator);
assert(!result2.ok);
});
},
Expand Down
36 changes: 20 additions & 16 deletions __tests__/disconnects.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { afterAll, assert, describe, expect, test, vi } from 'vitest';
import { iterNext } from '../util/testHelpers';
import { getIteratorFromStream, iterNext } from '../util/testHelpers';
import {
SubscribableServiceSchema,
TestServiceSchema,
Expand Down Expand Up @@ -83,9 +83,11 @@ describe.each(testMatrix())(
});

// start procedure
const [input, output] = await client.test.echo.stream();
input.push({ msg: 'abc', ignore: false });
const result = await iterNext(output);
const [inputWriter, outputReader] = await client.test.echo.stream();
const outputIterator = getIteratorFromStream(outputReader);

inputWriter.write({ msg: 'abc', ignore: false });
const result = await iterNext(outputIterator);
assert(result.ok);

expect(clientTransport.connections.size).toEqual(1);
Expand All @@ -95,7 +97,7 @@ describe.each(testMatrix())(
clientTransport.reconnectOnConnectionDrop = false;
clientTransport.connections.forEach((conn) => conn.close());

const nextResPromise = iterNext(output);
const nextResPromise = iterNext(outputIterator);
// end procedure

// after we've disconnected, hit end of grace period
Expand Down Expand Up @@ -139,15 +141,17 @@ describe.each(testMatrix())(

// start procedure
// client1 and client2 both subscribe
const [subscription1, close1] =
const [outputReader1, close1] =
await client1.subscribable.value.subscribe({});
let result = await iterNext(subscription1);
const outputIterator1 = getIteratorFromStream(outputReader1);
let result = await iterNext(outputIterator1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

const [subscription2, close2] =
const [outputReader2, close2] =
await client2.subscribable.value.subscribe({});
result = await iterNext(subscription2);
const outputIterator2 = getIteratorFromStream(outputReader2);
result = await iterNext(outputIterator2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });

Expand All @@ -156,10 +160,10 @@ describe.each(testMatrix())(
assert(add1.ok);

// both clients should receive the updated value
result = await iterNext(subscription1);
result = await iterNext(outputIterator1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });
result = await iterNext(subscription2);
result = await iterNext(outputIterator2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });

Expand All @@ -175,7 +179,7 @@ describe.each(testMatrix())(

// client1 who is still connected can still add values and receive updates
const add2Promise = client1.subscribable.add.rpc({ n: 2 });
const nextResPromise = iterNext(subscription2);
const nextResPromise = iterNext(outputIterator2);

// after we've disconnected, hit end of grace period
await advanceFakeTimersBySessionGrace();
Expand All @@ -189,7 +193,7 @@ describe.each(testMatrix())(

// client1 who is still connected can still add values and receive updates
assert((await add2Promise).ok);
result = await iterNext(subscription1);
result = await iterNext(outputIterator1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });

Expand Down Expand Up @@ -223,10 +227,10 @@ describe.each(testMatrix())(
});

// start procedure
const [addStream, addResult] =
const [inputWriter, addResult] =
await client.uploadable.addMultiple.upload();
addStream.push({ n: 1 });
addStream.push({ n: 2 });
inputWriter.write({ n: 1 });
inputWriter.write({ n: 2 });
// end procedure

// need to wait for connection to be established
Expand Down
Loading

0 comments on commit a4d0d0a

Please sign in to comment.