Skip to content

Commit

Permalink
fix: Send a close stream message when the stream input ends
Browse files Browse the repository at this point in the history
We currently don't send one of these messages when the `stream` input
ends, so the server can't know that the client is done!

This change now correctly sends the end-of-stream message.
  • Loading branch information
lhchavez committed Dec 13, 2023
1 parent d71f9da commit eafd4d1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
12 changes: 8 additions & 4 deletions __tests__/invariants.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,24 @@ describe('procedures should leave no trace after finishing', async () => {
const [input, output, close] = await client.test.echo.stream();
input.push({ msg: '1', ignore: false });
input.push({ msg: '2', ignore: false, end: true });
input.end();

const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: '1' });

// ensure we only have one stream despite pushing multiple messages.
await waitUntil(() => server.streams.size, 1);
input.end();
// ensure we no longer have any streams since the input was closed.
await waitUntil(() => server.streams.size, 0);

const result2 = await iterNext(output);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: '2' });

// ensure we exactly have one stream even after we send multiple messages
expect(server.streams.size).toEqual(1);

const result3 = await output.next();
assert(result3.done);

close();
// end procedure

Expand Down
4 changes: 4 additions & 0 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(

transport.send(m);
}

transport.send(closeStream(transport.clientId, serverId, streamId));
})();

// transport -> output
Expand All @@ -243,6 +245,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(

if (isStreamClose(msg.controlFlags)) {
outputStream.end();
transport.removeEventListener('message', listener);
} else {
outputStream.push(msg.payload);
}
Expand Down Expand Up @@ -293,6 +296,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(

if (isStreamClose(msg.controlFlags)) {
outputStream.end();
transport.removeEventListener('message', listener);
} else {
outputStream.push(msg.payload);
}
Expand Down
3 changes: 3 additions & 0 deletions util/testHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export function asClientStream<
for await (const transportRes of transportOutput) {
rawOutput.push(transportRes.payload);
}
rawOutput.end();
})();

// handle
Expand Down Expand Up @@ -262,6 +263,7 @@ export function asClientStreamWithInitialization<
for await (const transportRes of transportOutput) {
rawOutput.push(transportRes.payload);
}
rawOutput.end();
})();

// handle
Expand Down Expand Up @@ -328,6 +330,7 @@ export function asClientSubscription<
for await (const transportRes of transportOutput) {
rawOutput.push(transportRes.payload);
}
rawOutput.end();
})();

return async (
Expand Down

0 comments on commit eafd4d1

Please sign in to comment.