diff --git a/src/async/asyncTypes.ts b/src/async/asyncTypes.ts index 4f28b5f2..1a3c57d9 100644 --- a/src/async/asyncTypes.ts +++ b/src/async/asyncTypes.ts @@ -1,4 +1,3 @@ -import { TsonError } from "../errors.js"; import { TsonBranded, TsonType, @@ -66,7 +65,7 @@ export interface TsonAsyncOptions { /** * On stream error */ - onStreamError?: (err: TsonError) => void; + onStreamError?: (err: TsonStreamInterruptedError) => void; /** * The list of types to use diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index afd6f33b..27895524 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -527,3 +527,108 @@ test("1 iterator completed but another never finishes", async () => { ] `); }); + +test("e2e: simulated server crash", async () => { + const crashedDeferred = createDeferred(); + function createMockObj() { + async function* generator() { + for (let i = 0; i < 10; i++) { + yield i; + await sleep(1); + if (i === 5) { + // crash the server after 5 iterations + crashedDeferred.resolve(null); + } + + await sleep(1); + } + } + + return { + foo: "bar", + iterable: generator(), + promise: Promise.resolve(42), + rejectedPromise: Promise.reject(new Error("rejected promise")), + }; + } + + type MockObj = ReturnType; + + // ------------- server ------------------- + const opts = { + onStreamError: vi.fn(), + types: [tsonPromise, tsonAsyncIterator], + } satisfies TsonAsyncOptions; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const strIterarable = tson.stringify(obj, 4); + + void crashedDeferred.promise.then(() => { + // destroy the response stream + res.destroy(); + }); + + for await (const value of strIterarable) { + res.write(value); + } + + res.end(); + }, + }); + + // ------------- client ------------------- + const tson = createTsonAsync(opts); + + // do a streamed fetch request + const response = await fetch(server.url); + + assert(response.body); + + const textDecoder = new TextDecoder(); + const stringIterator = mapIterable( + readableStreamToAsyncIterable(response.body), + (v) => textDecoder.decode(v), + ); + + const parsed = await tson.parse(stringIterator); + { + // check the iterator + const results = []; + let iteratorError: Error | null = null; + try { + for await (const value of parsed.iterable) { + results.push(value); + } + } catch (err) { + iteratorError = err as Error; + } finally { + server.close(); + } + + assert(iteratorError); + expect(iteratorError.message).toMatchInlineSnapshot( + '"Stream interrupted: terminated"', + ); + expect(results).toEqual([0, 1, 2, 3, 4, 5]); + } + + expect(parsed.foo).toEqual("bar"); + expect(await parsed.promise).toEqual(42); + await expect( + parsed.rejectedPromise, + ).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"'); + + expect(opts.onStreamError).toHaveBeenCalledTimes(1); + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const streamError = opts.onStreamError.mock.calls[0]![0]!; + expect(streamError).toMatchInlineSnapshot( + "[TsonStreamInterruptedError: Stream interrupted: terminated]", + ); + + expect(streamError.cause).toMatchInlineSnapshot("[TypeError: terminated]"); +});