From 8e81a931bd63d7cf012b25e1397cb2f941000e12 Mon Sep 17 00:00:00 2001 From: Sheraff Date: Fri, 6 Oct 2023 15:17:32 +0200 Subject: [PATCH 1/2] feat: deserializeAsync parse takes a ReadableStream Uint8Array as input --- src/async/asyncTypes.ts | 4 + src/async/deserializeAsync.test.ts | 132 +++++++++++++++++++++++------ src/async/deserializeAsync.ts | 21 +++-- 3 files changed, 125 insertions(+), 32 deletions(-) diff --git a/src/async/asyncTypes.ts b/src/async/asyncTypes.ts index 6e215671..d890941f 100644 --- a/src/async/asyncTypes.ts +++ b/src/async/asyncTypes.ts @@ -3,6 +3,10 @@ import { TsonType } from "../types.js"; import { TsonBranded, TsonTypeTesterCustom } from "../types.js"; import { serialized } from "../types.js"; +export type TsonAsyncStringifiedStream = ReadableStream & { + [serialized]: TValue; +}; + export type TsonAsyncStringifierIterable = AsyncIterable & { [serialized]: TValue; }; diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index acd6d6af..eebe7884 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -7,47 +7,55 @@ import { tsonPromise, } from "../index.js"; import { assert } from "../internals/assert.js"; -import { - mapIterable, - readableStreamToAsyncIterable, -} from "../internals/iterableUtils.js"; import { createTestServer } from "../internals/testUtils.js"; import { TsonAsyncOptions } from "./asyncTypes.js"; +function createBodyStream(iterator: AsyncIterable) { + return new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + for await (const chunk of iterator) { + controller.enqueue(encoder.encode(chunk)); + } + controller.close(); + } + }) +} + test("deserialize variable chunk length", async () => { const tson = createTsonAsync({ nonce: () => "__tson", types: [tsonAsyncIterator, tsonPromise, tsonBigint], }); { - const iterable = (async function* () { + const body = createBodyStream((async function* () { await new Promise((resolve) => setTimeout(resolve, 1)); yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}'; yield "\n,\n[\n]\n]"; - })(); - const result = await tson.parse(iterable); + })()); + const result = await tson.parse(body); expect(result).toEqual({ foo: "bar" }); } { - const iterable = (async function* () { + const body = createBodyStream((async function* () { await new Promise((resolve) => setTimeout(resolve, 1)); yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]'; - })(); - const result = await tson.parse(iterable); + })()); + const result = await tson.parse(body); expect(result).toEqual({ foo: "bar" }); } { - const iterable = (async function* () { + const body = createBodyStream((async function* () { await new Promise((resolve) => setTimeout(resolve, 1)); yield '[\n{"json"'; yield ':{"foo":"b'; yield 'ar"},"nonce":"__tson"}\n,\n'; yield "[\n]\n"; yield "]"; - })(); - const result = await tson.parse(iterable); + })()); + const result = await tson.parse(body); expect(result).toEqual({ foo: "bar" }); } }); @@ -65,8 +73,8 @@ test("deserialize async iterable", async () => { }; const strIterable = tson.stringify(obj); - - const result = await tson.parse(strIterable); + const body = createBodyStream(strIterable); + const result = await tson.parse(body); expect(result).toEqual(obj); } @@ -78,8 +86,9 @@ test("deserialize async iterable", async () => { }; const strIterable = tson.stringify(obj); - - const result = await tson.parse(strIterable); + const body = createBodyStream(strIterable); + const resultRaw = await tson.parse(body); + const result = resultRaw as typeof obj; expect(await result.foo).toEqual("bar"); } @@ -112,8 +121,9 @@ test("stringify async iterable + promise", async () => { }; const strIterable = tson.stringify(input); - - const output = await tson.parse(strIterable); + const body = createBodyStream(strIterable); + const outputRaw = await tson.parse(body); + const output = outputRaw as typeof input; expect(output.foo).toEqual("bar"); @@ -175,15 +185,85 @@ test("e2e: stringify async iterable and promise over the network", async () => { assert(response.body); - const textDecoder = new TextDecoder(); + const parsedRaw = await tson.parse(response.body); + const parsed = parsedRaw as MockObj; + + expect(parsed.foo).toEqual("bar"); + + const results = []; + + for await (const value of parsed.iterable) { + results.push(value); + } + + expect(results).toEqual([1n, 2n, 3n, 4n, 5n]); + + expect(await parsed.promise).toEqual(42); + + await expect( + parsed.rejectedPromise, + ).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"'); + + server.close(); +}); + +test("e2e: server crash", async () => { + function createMockObj() { + async function* generator() { + for (const number of [1n, 2n, 3n, 4n, 5n]) { + await new Promise((resolve) => setTimeout(resolve, 1)); + yield number; + } + } + + return { + foo: "bar", + iterable: generator(), + promise: Promise.resolve(42), + rejectedPromise: Promise.reject(new Error("rejected promise")), + }; + } + + type MockObj = ReturnType; - // convert the response body to an async iterable - const stringIterator = mapIterable( - readableStreamToAsyncIterable(response.body), - (v) => textDecoder.decode(v), - ); + // ------------- server ------------------- + const opts: TsonAsyncOptions = { + types: [tsonPromise, tsonAsyncIterator, tsonBigint], + }; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const strIterarable = tson.stringify(obj, 4); + + let closed = false + setTimeout(() => { + closed = true + server.close(); + }, 2) + + // res.write(strIterarable); + + for await (const value of strIterarable) { + if (closed) continue + res.write(value); + } + + res.end(); + }, + }); + + // ------------- client ------------------- + const tson = createTsonAsync(opts); + + // do a streamed fetch request + const response = await fetch(server.url); + + assert(response.body); - const parsedRaw = await tson.parse(stringIterator); + const parsedRaw = await tson.parse(response.body); const parsed = parsedRaw as MockObj; expect(parsed.foo).toEqual("bar"); diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 4eeae8bd..6fe1c05d 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -12,7 +12,7 @@ import { import { TsonAsyncIndex, TsonAsyncOptions, - TsonAsyncStringifierIterable, + TsonAsyncStringifiedStream, TsonAsyncType, } from "./asyncTypes.js"; import { TsonAsyncValueTuple } from "./serializeAsync.js"; @@ -25,7 +25,7 @@ type AnyTsonTransformerSerializeDeserialize = | TsonTransformerSerializeDeserialize; type TsonParseAsync = ( - string: AsyncIterable | TsonAsyncStringifierIterable, + string: ReadableStream | TsonAsyncStringifiedStream, ) => Promise; export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { @@ -42,13 +42,22 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { } } - return async (iterable: AsyncIterable) => { + return async (stream: ReadableStream) => { // this is an awful hack to get around making a some sort of pipeline const cache = new Map< TsonAsyncIndex, ReadableStreamDefaultController >(); - const iterator = iterable[Symbol.asyncIterator](); + + const decoder = new TextDecoder(); + const textStream = stream.pipeThrough( + new TransformStream({ + async transform(chunk, controller) { + controller.enqueue(decoder.decode(chunk)); + }, + }), + ); + const reader = textStream.getReader(); const walker: WalkerFactory = (nonce) => { const walk: WalkFn = (value) => { @@ -123,7 +132,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { do { lines.forEach(readLine); lines.length = 0; - const nextValue = await iterator.next(); + const nextValue = await reader.read(); if (!nextValue.done) { accumulator += nextValue.value; const parts = accumulator.split("\n"); @@ -144,7 +153,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const lines: string[] = []; do { - const nextValue = await iterator.next(); + const nextValue = await reader.read(); if (nextValue.done) { throw new TsonError("Unexpected end of stream before head"); } From 42a3c7d7f80f6d91e50d39c28fffb9a3aede67da Mon Sep 17 00:00:00 2001 From: Sheraff Date: Fri, 6 Oct 2023 15:30:47 +0200 Subject: [PATCH 2/2] fix other tests --- src/async/deserializeAsync.test.ts | 13 +------------ src/handlers/tsonPromise.test.ts | 14 +++++++++----- src/index.test.ts | 8 ++++---- src/internals/testUtils.ts | 12 ++++++++++++ 4 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index eebe7884..5e87e735 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -7,20 +7,9 @@ import { tsonPromise, } from "../index.js"; import { assert } from "../internals/assert.js"; -import { createTestServer } from "../internals/testUtils.js"; +import { createTestServer, createBodyStream } from "../internals/testUtils.js"; import { TsonAsyncOptions } from "./asyncTypes.js"; -function createBodyStream(iterator: AsyncIterable) { - return new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder(); - for await (const chunk of iterator) { - controller.enqueue(encoder.encode(chunk)); - } - controller.close(); - } - }) -} test("deserialize variable chunk length", async () => { const tson = createTsonAsync({ diff --git a/src/handlers/tsonPromise.test.ts b/src/handlers/tsonPromise.test.ts index d7ae1db6..25460eb0 100644 --- a/src/handlers/tsonPromise.test.ts +++ b/src/handlers/tsonPromise.test.ts @@ -19,6 +19,7 @@ import { createTestServer, waitError, waitFor, + createBodyStream, } from "../internals/testUtils.js"; import { TsonSerialized, TsonType } from "../types.js"; @@ -354,7 +355,8 @@ test("pipe stringifier to parser", async () => { const strIterarable = tson.stringify(obj, 4); - const value = await tson.parse(strIterarable); + const valueRaw = await tson.parse(createBodyStream(strIterarable)); + const value = valueRaw as typeof obj; expect(value).toHaveProperty("foo"); expect(await value.foo).toBe("bar"); @@ -377,7 +379,8 @@ test("stringify and parse promise with a promise", async () => { const strIterarable = tson.stringify(obj, 4); - const value = await tson.parse(strIterarable); + const valueRaw = await tson.parse(createBodyStream(strIterarable)); + const value = valueRaw as typeof obj; const firstPromise = await value.promise; @@ -442,7 +445,7 @@ test("stringify and parse promise with a promise over a network connection", asy (v) => textDecoder.decode(v), ); - const value = await tson.parse(stringIterator); + const value = await tson.parse(createBodyStream(stringIterator)); const asObj = value as Obj; const firstPromise = await asObj.promise; @@ -481,7 +484,7 @@ test("does not crash node when it receives a promise rejection", async () => { }; const iterator = stringify(original); - const [_result, deferreds] = await parse(iterator); + const [_result, deferreds] = await parse(createBodyStream(iterator)); const result = _result as typeof original; await waitFor(() => { @@ -549,7 +552,8 @@ test("stringify promise rejection", async () => { // parse const iterator = stringify(original, 2); - const result = await parse(iterator); + const resultRaw = await parse(createBodyStream(iterator)); + const result = resultRaw as typeof original; expect(result).toMatchInlineSnapshot(` { diff --git a/src/index.test.ts b/src/index.test.ts index 3e0b1a49..2eb6e856 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -7,7 +7,7 @@ import { createTsonAsync, tsonPromise, } from "./index.js"; -import { expectError, waitError, waitFor } from "./internals/testUtils.js"; +import { expectError, waitError, waitFor, createBodyStream } from "./internals/testUtils.js"; import { TsonSerialized } from "./types.js"; test("multiple handlers for primitive string found", () => { @@ -95,7 +95,7 @@ test("async: duplicate keys", async () => { const gen = generator(); await createTsonAsync({ types: [stringHandler, stringHandler], - }).parse(gen); + }).parse(createBodyStream(gen)); }); expect(err).toMatchInlineSnapshot( @@ -136,7 +136,7 @@ test("async: bad init", async () => { const gen = generator(); await createTsonAsync({ types: [], - }).parse(gen); + }).parse(createBodyStream(gen)); }); expect(err).toMatchInlineSnapshot( @@ -171,7 +171,7 @@ test("async: bad values", async () => { await createTsonAsync({ onStreamError: onErrorSpy, types: [tsonPromise], - }).parse(generator()); + }).parse(createBodyStream(generator())); await waitFor(() => { expect(onErrorSpy).toHaveBeenCalledTimes(1); diff --git a/src/internals/testUtils.ts b/src/internals/testUtils.ts index 6b85b820..e3150251 100644 --- a/src/internals/testUtils.ts +++ b/src/internals/testUtils.ts @@ -72,3 +72,15 @@ export async function createTestServer(opts: { url: `http://localhost:${port}`, }; } + +export function createBodyStream(iterator: AsyncIterable) { + return new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + for await (const chunk of iterator) { + controller.enqueue(encoder.encode(chunk)); + } + controller.close(); + } + }) +}