diff --git a/__tests__/invariants.test.ts b/__tests__/invariants.test.ts index 7779990b..5ac39c03 100644 --- a/__tests__/invariants.test.ts +++ b/__tests__/invariants.test.ts @@ -128,20 +128,24 @@ describe('procedures should leave no trace after finishing', async () => { const [input, output, close] = await client.test.echo.stream(); input.push({ msg: '1', ignore: false }); input.push({ msg: '2', ignore: false, end: true }); - input.end(); const result1 = await iterNext(output); assert(result1.ok); expect(result1.payload).toStrictEqual({ response: '1' }); + + // ensure we only have one stream despite pushing multiple messages. + await waitUntil(() => server.streams.size, 1); + input.end(); + // ensure we no longer have any streams since the input was closed. + await waitUntil(() => server.streams.size, 0); + const result2 = await iterNext(output); assert(result2.ok); expect(result2.payload).toStrictEqual({ response: '2' }); - // ensure we exactly have one stream even after we send multiple messages - expect(server.streams.size).toEqual(1); - const result3 = await output.next(); assert(result3.done); + close(); // end procedure diff --git a/router/client.ts b/router/client.ts index 7e46f974..baaad037 100644 --- a/router/client.ts +++ b/router/client.ts @@ -233,6 +233,8 @@ export const createClient = >>( transport.send(m); } + + transport.send(closeStream(transport.clientId, serverId, streamId)); })(); // transport -> output @@ -243,6 +245,7 @@ export const createClient = >>( if (isStreamClose(msg.controlFlags)) { outputStream.end(); + transport.removeEventListener('message', listener); } else { outputStream.push(msg.payload); } @@ -293,6 +296,7 @@ export const createClient = >>( if (isStreamClose(msg.controlFlags)) { outputStream.end(); + transport.removeEventListener('message', listener); } else { outputStream.push(msg.payload); } diff --git a/util/testHelpers.ts b/util/testHelpers.ts index 8ccf0f6e..c60994ab 100644 --- a/util/testHelpers.ts +++ b/util/testHelpers.ts @@ -179,6 +179,7 @@ export function asClientStream< for await (const transportRes of transportOutput) { rawOutput.push(transportRes.payload); } + rawOutput.end(); })(); // handle @@ -262,6 +263,7 @@ export function asClientStreamWithInitialization< for await (const transportRes of transportOutput) { rawOutput.push(transportRes.payload); } + rawOutput.end(); })(); // handle @@ -328,6 +330,7 @@ export function asClientSubscription< for await (const transportRes of transportOutput) { rawOutput.push(transportRes.payload); } + rawOutput.end(); })(); return async (