Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

fix: move onStreamError to parse options + add test for request abortion #55

Merged
merged 14 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ export interface TsonAsyncOptions {
* @default `${crypto.randomUUID} if available, otherwise a random string generated by Math.random`
*/
nonce?: () => number | string;
/**
* On stream error
*/
onStreamError?: (err: TsonStreamInterruptedError) => void;

/**
* The list of types to use
Expand Down
185 changes: 162 additions & 23 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { expect, test, vi, vitest } from "vitest";
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { expect, test, vitest } from "vitest";

import {
TsonAsyncOptions,
TsonParseAsyncOptions,
TsonType,
createTsonAsync,
createTsonParseAsync,
Expand All @@ -17,7 +20,6 @@ import {
waitFor,
} from "../internals/testUtils.js";
import { TsonSerialized } from "../sync/syncTypes.js";
import { TsonAsyncOptions } from "./asyncTypes.js";
import { mapIterable, readableStreamToAsyncIterable } from "./iterableUtils.js";

test("deserialize variable chunk length", async () => {
Expand Down Expand Up @@ -92,13 +94,15 @@ test("deserialize async iterable", async () => {
});

test("stringify async iterable + promise", async () => {
const onErr = vi.fn();
const tson = createTsonAsync({
nonce: () => "__tson",
onStreamError: onErr,
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

async function* iterable() {
await sleep(1);
yield 1n;
Expand All @@ -119,7 +123,7 @@ test("stringify async iterable + promise", async () => {

const strIterable = tson.stringify(input);

const output = await tson.parse(strIterable);
const output = await tson.parse(strIterable, parseOptions);

expect(output.foo).toEqual("bar");

Expand Down Expand Up @@ -348,16 +352,19 @@ test("values missing when stream ends", async () => {
}

const opts = {
onStreamError: vitest.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

const parse = createTsonParseAsync(opts);

const result = await parse<{
iterable: AsyncIterable<string>;
promise: Promise<unknown>;
}>(generator());
}>(generator(), parseOptions);

{
// iterator should error
Expand Down Expand Up @@ -388,8 +395,8 @@ test("values missing when stream ends", async () => {
);
}

expect(opts.onStreamError).toHaveBeenCalledTimes(1);
expect(opts.onStreamError.mock.calls).toMatchInlineSnapshot(`
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(`
[
[
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly],
Expand Down Expand Up @@ -420,17 +427,19 @@ test("async: missing values of promise", async () => {
// yield "]]\n"; // <-- stream and values ended symbol
}

const onErrorSpy = vitest.fn();
const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

await createTsonAsync({
onStreamError: onErrorSpy,
types: [tsonPromise],
}).parse(generator());
}).parse(generator(), parseOptions);

await waitFor(() => {
expect(onErrorSpy).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);
});

expect(onErrorSpy.mock.calls[0][0]).toMatchInlineSnapshot(
expect(parseOptions.onStreamError.mock.calls[0]![0]!).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]",
);
});
Expand Down Expand Up @@ -469,16 +478,18 @@ test("1 iterator completed but another never finishes", async () => {
}

const opts = {
onStreamError: vitest.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;
const parse = createTsonParseAsync(opts);

const result = await parse<{
iterable1: AsyncIterable<string>;
iterable2: AsyncIterable<string>;
}>(generator());
}>(generator(), parseOptions);

{
// iterator 1 should complete
Expand Down Expand Up @@ -517,9 +528,9 @@ test("1 iterator completed but another never finishes", async () => {
);
}

expect(opts.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);

expect(opts.onStreamError.mock.calls).toMatchInlineSnapshot(`
expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(`
[
[
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly],
Expand Down Expand Up @@ -556,10 +567,13 @@ test("e2e: simulated server crash", async () => {

// ------------- server -------------------
const opts = {
onStreamError: vi.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);
Expand Down Expand Up @@ -594,7 +608,7 @@ test("e2e: simulated server crash", async () => {
(v) => textDecoder.decode(v),
);

const parsed = await tson.parse<MockObj>(stringIterator);
const parsed = await tson.parse<MockObj>(stringIterator, parseOptions);
{
// check the iterator
const results = [];
Expand Down Expand Up @@ -622,13 +636,138 @@ test("e2e: simulated server crash", async () => {
parsed.rejectedPromise,
).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"');

expect(opts.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const streamError = opts.onStreamError.mock.calls[0]![0]!;
const streamError = parseOptions.onStreamError.mock.calls[0]![0]!;
expect(streamError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: terminated]",
);

expect(streamError.cause).toMatchInlineSnapshot("[TypeError: terminated]");
});

test("e2e: client aborted request", async () => {
// ------------- server -------------------
const serverSentChunks: string[] = [];
const iteratorChunks: number[] = [];
function createMockObj() {
async function* generator() {
for (let i = 0; i < 10; i++) {
yield i;
iteratorChunks.push(i);
await sleep(5);
}
}

return {
iterable: generator(),
};
}

type MockObj = ReturnType<typeof createMockObj>;
const opts = {
nonce: () => "__tson",
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const strIterarable = tson.stringify(obj, 4);

for await (const value of strIterarable) {
serverSentChunks.push(value.trimEnd());
res.write(value);
}

res.end();
},
});

// ------------- client -------------------
const abortController = new AbortController();

const tson = createTsonAsync(opts);

// do a streamed fetch request
const response = await fetch(server.url, {
signal: abortController.signal,
});

assert(response.body);

const textDecoder = new TextDecoder();
const stringIterator = mapIterable(
readableStreamToAsyncIterable(response.body),
(v) => textDecoder.decode(v),
);

const parsed = await tson.parse<MockObj>(stringIterator, parseOptions);
{
// check the iterator
const results = [];
let iteratorError: Error | null = null;
try {
for await (const value of parsed.iterable) {
results.push(value);

if (value === 5) {
// abort the request after when receiving 5
abortController.abort();
}
}
} catch (err) {
iteratorError = err as Error;
} finally {
server.close();
}

expect(results).toEqual([0, 1, 2, 3, 4, 5]);
expect(iteratorError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: The operation was aborted.]",
);
}

expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);

const streamError = parseOptions.onStreamError.mock.calls[0]![0]!;
expect(streamError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: The operation was aborted.]",
);

expect(streamError.cause).toMatchInlineSnapshot(
"[AbortError: The operation was aborted.]",
);

expect(iteratorChunks.length).toBeLessThan(10);
expect(iteratorChunks).toMatchInlineSnapshot(`
[
0,
1,
2,
3,
4,
5,
]
`);
expect(serverSentChunks).toMatchInlineSnapshot(`
[
"[",
" {\\"json\\":{\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}",
" ,",
" [",
" [0,[0,0]]",
" ,[0,[0,1]]",
" ,[0,[0,2]]",
" ,[0,[0,3]]",
" ,[0,[0,4]]",
" ,[0,[0,5]]",
]
`);
});
20 changes: 16 additions & 4 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ type AnyTsonTransformerSerializeDeserialize =
| TsonAsyncType<any, any>
| TsonTransformerSerializeDeserialize<any, any>;

export interface TsonParseAsyncOptions {
/**
* On stream error
*/
onStreamError?: (err: TsonStreamInterruptedError) => void;
}

type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterable<TValue>,
opts?: TsonParseAsyncOptions,
) => Promise<TValue>;

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
Expand All @@ -43,7 +51,10 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
}
}

return async (iterable: AsyncIterable<string>) => {
return async (
iterable: AsyncIterable<string>,
parseOptions: TsonParseAsyncOptions,
) => {
// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
Expand Down Expand Up @@ -139,6 +150,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
do {
lines.forEach(readLine);
lines.length = 0;

const nextValue = await iterator.next();
if (!nextValue.done) {
accumulator += nextValue.value;
Expand Down Expand Up @@ -205,7 +217,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
controller.enqueue(err);
}

opts.onStreamError?.(err);
parseOptions.onStreamError?.(err);
});
}
}
Expand All @@ -220,8 +232,8 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync {
const instance = createTsonParseAsyncInner(opts);

return (async (iterable) => {
const [result] = await instance(iterable);
return (async (iterable, opts) => {
const [result] = await instance(iterable, opts ?? {});

return result;
}) as TsonParseAsync;
Expand Down
2 changes: 1 addition & 1 deletion src/async/handlers/tsonPromise.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ test("does not crash node when it receives a promise rejection", async () => {
};
const iterator = stringify(original);

await parse(iterator);
await parse(iterator, {});

await sleep(10);
});
Expand Down
Loading
Loading