Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
feat: tsonAsyncGeneratorFunction proposal for 'iterators that can be …
Browse files Browse the repository at this point in the history
…iterated many times'
  • Loading branch information
Sheraff committed Oct 28, 2023
1 parent 7db3458 commit b1733d9
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 8 deletions.
31 changes: 23 additions & 8 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
tsonAsyncIterable,
tsonBigint,
tsonPromise,
tsonAsyncGeneratorFunction,

Check failure on line 11 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected "tsonAsyncGeneratorFunction" to come before "tsonPromise"
} from "../index.js";
import { assert } from "../internals/assert.js";
import {
Expand Down Expand Up @@ -92,17 +93,17 @@ test("deserialize async iterable", async () => {
}
});

test("stringify async iterable + promise", async () => {
test.only("stringify async iterable + promise + async generator function", async () => {

Check failure on line 96 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

test.only not permitted
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterable, tsonPromise, tsonBigint],
types: [tsonAsyncIterable, tsonPromise, tsonBigint, tsonAsyncGeneratorFunction],
});

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

async function* iterable() {
async function* generator() {
await sleep(1);
yield 1n;
await sleep(1);
Expand All @@ -116,8 +117,9 @@ test("stringify async iterable + promise", async () => {

const input = {
foo: "bar",
iterable: iterable(),
iterable: generator(),
promise: Promise.resolve(42),
generator,

Check failure on line 122 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected "generator" to come before "promise"
};

const strIterable = tson.stringifyJsonStream(input);
Expand All @@ -128,13 +130,26 @@ test("stringify async iterable + promise", async () => {

expect(await output.promise).toEqual(42);

const result = [];

const iteratorResult = [];
for await (const value of output.iterable) {
result.push(value);
iteratorResult.push(value);
}
expect(iteratorResult).toEqual([1n, 2n, 3n, 4n, 5n]);

Check failure on line 137 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected blank line before this statement

expect(result).toEqual([1n, 2n, 3n, 4n, 5n]);
const generatorResult1 = [];
const iterator1 = output.generator();
for await (const value of iterator1) {
generatorResult1.push(value);
}
expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]);

Check failure on line 144 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected blank line before this statement

// generator should be able to be iterated again
const generatorResult2 = [];
const iterator2 = output.generator();
for await (const value of iterator2) {
generatorResult2.push(value);
}
expect(generatorResult2).toEqual([1n, 2n, 3n, 4n, 5n]);

Check failure on line 152 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected blank line before this statement
});

test("e2e: stringify async iterable and promise over the network", async () => {
Expand Down
128 changes: 128 additions & 0 deletions src/async/handlers/tsonAsyncGeneratorFunction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import {
TsonAbortError,
TsonPromiseRejectionError,
TsonStreamInterruptedError,
} from "../asyncErrors.js";
import { TsonAsyncType } from "../asyncTypes.js";

const ITERATOR_VALUE = 0;
const ITERATOR_ERROR = 1;
const ITERATOR_DONE = 2;
type SerializedIterableResult =
| [typeof ITERATOR_DONE]
| [typeof ITERATOR_ERROR, unknown]
| [typeof ITERATOR_VALUE, unknown];

function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator<unknown, void, unknown> {

Check failure on line 16 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View workflow job for this annotation

GitHub Actions / lint

This is the default value for this type parameter, so it can be omitted
return (
!!value &&
typeof value === "function" &&
value.prototype[Symbol.toStringTag] === "AsyncGenerator"
);
}

export const tsonAsyncGeneratorFunction: TsonAsyncType<
() => AsyncGenerator<unknown, void, unknown>,

Check failure on line 25 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View workflow job for this annotation

GitHub Actions / lint

This is the default value for this type parameter, so it can be omitted
SerializedIterableResult
> = {
async: true,
deserialize: (opts) => {
// each value is stored in RAM for generator to be iterated many times
const chunks: Exclude<Awaited<ReturnType<(typeof opts.reader)["read"]>>['value'], undefined>[] = []
// we need to know if stream is done or just waiting, so that generator can stop looping
let collectionDone = false
// if generator is being iterated while data is still being collected, we need to be able to wait on the next chunks
let resolveNext: () => void
let promiseNext = new Promise<void>(resolve => resolveNext = resolve)

/**
* Collects chunks from the stream until it's done
* - handle closing the stream
* - handle generating new promises for generator to wait on
*/
void async function collect() {
let next: Awaited<ReturnType<(typeof opts.reader)["read"]>>;
loop: while (((next = await opts.reader.read()), !next.done)) {
const { value } = next
chunks.push(value)
if (value instanceof TsonStreamInterruptedError) {
if (value.cause instanceof TsonAbortError) {
opts.close()
return
}
throw value // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator?

Check failure on line 53 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View workflow job for this annotation

GitHub Actions / lint

Expected blank line before this statement
}
switch (value[0]) {

Check failure on line 55 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View workflow job for this annotation

GitHub Actions / lint

Expected blank line before this statement
case ITERATOR_DONE: {
opts.close();
break loop;
}
case ITERATOR_ERROR: {
opts.close();
break;
}
}
resolveNext!()
promiseNext = new Promise<void>(resolve => resolveNext = resolve)
}
collectionDone = true
resolveNext!()
}()

/**
* Generator that yields values from the stream
* - handles waiting for chunks if stream is still active
* - handles throwing errors from values
*/
return async function* generator() {
await promiseNext
for (let i = 0; i < chunks.length; i++) {
const value = chunks[i]!
if (value instanceof TsonStreamInterruptedError) {
if (value.cause instanceof TsonAbortError) {
return;
}
throw value;
}
switch (value[0]) {
case ITERATOR_DONE: {
return;
}

case ITERATOR_ERROR: {
throw TsonPromiseRejectionError.from(value[1]);
}

case ITERATOR_VALUE: {
yield value[1];
break; // <-- breaks the switch, not the loop
}
}
if (i === chunks.length - 1) {
if (collectionDone) break
await promiseNext
if (collectionDone) break
}
}
};
},
key: "AsyncGeneratorFunction",
serializeIterator: async function* serialize(opts) {
if (opts.value.length !== 0) {
throw new Error(
`AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}`
);
}
try {
const iterator = opts.value()
for await (const value of iterator) {
yield [ITERATOR_VALUE, value];
}

yield [ITERATOR_DONE];
} catch (err) {
yield [ITERATOR_ERROR, err];
}
},
test: isAsyncGeneratorFunction,
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ export * from "./async/asyncErrors.js";
// type handlers
export * from "./async/handlers/tsonPromise.js";
export * from "./async/handlers/tsonAsyncIterable.js";
export * from "./async/handlers/tsonAsyncGeneratorFunction.js";

0 comments on commit b1733d9

Please sign in to comment.