diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 37da08a..15365c7 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -8,6 +8,7 @@ import { tsonAsyncIterable, tsonBigint, tsonPromise, + tsonAsyncGeneratorFunction, } from "../index.js"; import { assert } from "../internals/assert.js"; import { @@ -92,17 +93,17 @@ test("deserialize async iterable", async () => { } }); -test("stringify async iterable + promise", async () => { +test.only("stringify async iterable + promise + async generator function", async () => { const tson = createTsonAsync({ nonce: () => "__tson", - types: [tsonAsyncIterable, tsonPromise, tsonBigint], + types: [tsonAsyncIterable, tsonPromise, tsonBigint, tsonAsyncGeneratorFunction], }); const parseOptions = { onStreamError: vitest.fn(), } satisfies TsonParseAsyncOptions; - async function* iterable() { + async function* generator() { await sleep(1); yield 1n; await sleep(1); @@ -116,8 +117,9 @@ test("stringify async iterable + promise", async () => { const input = { foo: "bar", - iterable: iterable(), + iterable: generator(), promise: Promise.resolve(42), + generator, }; const strIterable = tson.stringifyJsonStream(input); @@ -128,13 +130,26 @@ test("stringify async iterable + promise", async () => { expect(await output.promise).toEqual(42); - const result = []; - + const iteratorResult = []; for await (const value of output.iterable) { - result.push(value); + iteratorResult.push(value); } + expect(iteratorResult).toEqual([1n, 2n, 3n, 4n, 5n]); - expect(result).toEqual([1n, 2n, 3n, 4n, 5n]); + const generatorResult1 = []; + const iterator1 = output.generator(); + for await (const value of iterator1) { + generatorResult1.push(value); + } + expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]); + + // generator should be able to be iterated again + const generatorResult2 = []; + const iterator2 = output.generator(); + for await (const value of iterator2) { + generatorResult2.push(value); + } + expect(generatorResult2).toEqual([1n, 2n, 3n, 4n, 5n]); }); test("e2e: stringify async iterable and promise over the network", async () => { diff --git a/src/async/handlers/tsonAsyncGeneratorFunction.ts b/src/async/handlers/tsonAsyncGeneratorFunction.ts new file mode 100644 index 0000000..bb2dd01 --- /dev/null +++ b/src/async/handlers/tsonAsyncGeneratorFunction.ts @@ -0,0 +1,128 @@ +import { + TsonAbortError, + TsonPromiseRejectionError, + TsonStreamInterruptedError, +} from "../asyncErrors.js"; +import { TsonAsyncType } from "../asyncTypes.js"; + +const ITERATOR_VALUE = 0; +const ITERATOR_ERROR = 1; +const ITERATOR_DONE = 2; +type SerializedIterableResult = + | [typeof ITERATOR_DONE] + | [typeof ITERATOR_ERROR, unknown] + | [typeof ITERATOR_VALUE, unknown]; + +function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator { + return ( + !!value && + typeof value === "function" && + value.prototype[Symbol.toStringTag] === "AsyncGenerator" + ); +} + +export const tsonAsyncGeneratorFunction: TsonAsyncType< + () => AsyncGenerator, + SerializedIterableResult +> = { + async: true, + deserialize: (opts) => { + // each value is stored in RAM for generator to be iterated many times + const chunks: Exclude>['value'], undefined>[] = [] + // we need to know if stream is done or just waiting, so that generator can stop looping + let collectionDone = false + // if generator is being iterated while data is still being collected, we need to be able to wait on the next chunks + let resolveNext: () => void + let promiseNext = new Promise(resolve => resolveNext = resolve) + + /** + * Collects chunks from the stream until it's done + * - handle closing the stream + * - handle generating new promises for generator to wait on + */ + void async function collect() { + let next: Awaited>; + loop: while (((next = await opts.reader.read()), !next.done)) { + const { value } = next + chunks.push(value) + if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + opts.close() + return + } + throw value // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator? + } + switch (value[0]) { + case ITERATOR_DONE: { + opts.close(); + break loop; + } + case ITERATOR_ERROR: { + opts.close(); + break; + } + } + resolveNext!() + promiseNext = new Promise(resolve => resolveNext = resolve) + } + collectionDone = true + resolveNext!() + }() + + /** + * Generator that yields values from the stream + * - handles waiting for chunks if stream is still active + * - handles throwing errors from values + */ + return async function* generator() { + await promiseNext + for (let i = 0; i < chunks.length; i++) { + const value = chunks[i]! + if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + return; + } + throw value; + } + switch (value[0]) { + case ITERATOR_DONE: { + return; + } + + case ITERATOR_ERROR: { + throw TsonPromiseRejectionError.from(value[1]); + } + + case ITERATOR_VALUE: { + yield value[1]; + break; // <-- breaks the switch, not the loop + } + } + if (i === chunks.length - 1) { + if (collectionDone) break + await promiseNext + if (collectionDone) break + } + } + }; + }, + key: "AsyncGeneratorFunction", + serializeIterator: async function* serialize(opts) { + if (opts.value.length !== 0) { + throw new Error( + `AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}` + ); + } + try { + const iterator = opts.value() + for await (const value of iterator) { + yield [ITERATOR_VALUE, value]; + } + + yield [ITERATOR_DONE]; + } catch (err) { + yield [ITERATOR_ERROR, err]; + } + }, + test: isAsyncGeneratorFunction, +}; diff --git a/src/index.ts b/src/index.ts index 8c92aca..faef4e9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,3 +38,4 @@ export * from "./async/asyncErrors.js"; // type handlers export * from "./async/handlers/tsonPromise.js"; export * from "./async/handlers/tsonAsyncIterable.js"; +export * from "./async/handlers/tsonAsyncGeneratorFunction.js";