Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

feat: split up deserialization and start on support for SSE #67

Merged
merged 18 commits into from
Oct 13, 2023
12 changes: 6 additions & 6 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ test("values missing when stream ends", async () => {
assert(err);

expect(err.message).toMatchInlineSnapshot(
'"Stream interrupted: Stream ended unexpectedly"',
'"Stream interrupted: Stream ended unexpectedly (state 1)"',
);
}

Expand All @@ -390,15 +390,15 @@ test("values missing when stream ends", async () => {
const err = await waitError(result.promise);

expect(err).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]",
'[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)]',
);
}

expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(`
[
[
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly],
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)],
],
]
`);
Expand Down Expand Up @@ -439,7 +439,7 @@ test("async: missing values of promise", async () => {
});

expect(parseOptions.onStreamError.mock.calls[0]![0]!).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]",
'[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)]',
);
});

Expand Down Expand Up @@ -523,7 +523,7 @@ test("1 iterator completed but another never finishes", async () => {
`);

expect(err.message).toMatchInlineSnapshot(
'"Stream interrupted: Stream ended unexpectedly"',
'"Stream interrupted: Stream ended unexpectedly (state 1)"',
);
}

Expand All @@ -532,7 +532,7 @@ test("1 iterator completed but another never finishes", async () => {
expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(`
[
[
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly],
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)],
],
]
`);
Expand Down
199 changes: 110 additions & 89 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/no-non-null-assertion */

import { TsonError } from "../errors.js";
import { assert } from "../internals/assert.js";
Expand Down Expand Up @@ -37,7 +37,10 @@
opts?: TsonParseAsyncOptions,
) => Promise<TValue>;

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
type TsonDeserializeIterable = AsyncIterable<
TsonAsyncValueTuple | TsonSerialized
>;
export function createTsonDeserializer(opts: TsonAsyncOptions) {
const typeByKey: Record<string, AnyTsonTransformerSerializeDeserialize> = {};

for (const handler of opts.types) {
Expand All @@ -52,10 +55,9 @@
}

return async (
iterable: AsyncIterable<string>,
iterable: TsonDeserializeIterable,
parseOptions: TsonParseAsyncOptions,
) => {
// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
ReadableStreamDefaultController<unknown>
Expand Down Expand Up @@ -106,36 +108,15 @@
return walk;
};

async function getStreamedValues(
lines: string[],
accumulator: string,
walk: WalkFn,
) {
// <stream state>
let streamEnded = false;
// </stream state>

function readLine(str: string) {
// console.log("got str", str);
str = str.trimStart();

if (str.startsWith(",")) {
// ignore leading comma
str = str.slice(1);
}

if (str === "" || str === "[" || str === ",") {
// beginning of values array or empty string
return;
}

if (str === "]]") {
// end of values and stream
streamEnded = true;
return;
async function getStreamedValues(walk: WalkFn) {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
const nextValue = await iterator.next();
if (nextValue.done) {
break;
}

const [index, result] = JSON.parse(str) as TsonAsyncValueTuple;
const [index, result] = nextValue.value as TsonAsyncValueTuple;

const controller = cache.get(index);

Expand All @@ -146,68 +127,25 @@
// resolving deferred
controller.enqueue(walkedResult);
}

do {
lines.forEach(readLine);
lines.length = 0;

const nextValue = await iterator.next();
if (!nextValue.done) {
accumulator += nextValue.value;
const parts = accumulator.split("\n");
accumulator = parts.pop() ?? "";
lines.push(...parts);
} else if (accumulator) {
readLine(accumulator);
}
} while (lines.length);

assert(streamEnded, "Stream ended unexpectedly");
}

async function init() {
let accumulator = "";

// get the head of the JSON
const nextValue = await iterator.next();
if (nextValue.done) {
throw new TsonError("Unexpected end of stream before head");
}

Check warning on line 137 in src/async/deserializeAsync.ts

View check run for this annotation

Codecov / codecov/patch

src/async/deserializeAsync.ts#L136-L137

Added lines #L136 - L137 were not covered by tests

const lines: string[] = [];
do {
const nextValue = await iterator.next();
if (nextValue.done) {
throw new TsonError("Unexpected end of stream before head");
}

accumulator += nextValue.value;

const parts = accumulator.split("\n");
accumulator = parts.pop() ?? "";
lines.push(...parts);
} while (lines.length < 2);

const [
/**
* First line is just a `[`
*/
_firstLine,
/**
* Second line is the shape of the JSON
*/
headLine,
// .. third line is a `,`
// .. fourth line is the start of the values array
...buffer
] = lines;

assert(headLine, "No head line found");

const head = JSON.parse(headLine) as TsonSerialized<any>;
const head = nextValue.value as TsonSerialized<any>;

const walk = walker(head.nonce);

try {
return walk(head.json);
const walked = walk(head.json);

return walked;
} finally {
getStreamedValues(buffer, accumulator, walk).catch((cause) => {
getStreamedValues(walk).catch((cause) => {
// Something went wrong while getting the streamed values

const err = new TsonStreamInterruptedError(cause);
Expand All @@ -222,19 +160,102 @@
}
}

const result = await init().catch((cause: unknown) => {
return await init().catch((cause: unknown) => {
throw new TsonError("Failed to initialize TSON stream", { cause });
});
return [result, cache] as const;
};
}

function lineAccumulator() {
let accumulator = "";
const lines: string[] = [];

return {
lines,
push(chunk: string) {
accumulator += chunk;

const parts = accumulator.split("\n");
accumulator = parts.pop() ?? "";
lines.push(...parts);
},
};
}

async function* stringIterableToTsonIterable(
iterable: AsyncIterable<string>,
): TsonDeserializeIterable {
// get the head of the JSON
const acc = lineAccumulator();

// state of stream
const AWAITING_HEAD = 0;
const STREAMING_VALUES = 1;
const ENDED = 2;

let state: typeof AWAITING_HEAD | typeof ENDED | typeof STREAMING_VALUES =
AWAITING_HEAD;

// iterate values & yield them

for await (const str of iterable) {
acc.push(str);

if (state === AWAITING_HEAD && acc.lines.length >= 2) {
/**
* First line is just a `[`
*/
acc.lines.shift();

// Second line is the head
const headLine = acc.lines.shift();

assert(headLine, "No head line found");

const head = JSON.parse(headLine) as TsonSerialized<any>;

yield head;

state = STREAMING_VALUES;
}

if (state === STREAMING_VALUES) {
while (acc.lines.length) {
let str = acc.lines.shift()!;

// console.log("got str", str);
str = str.trimStart();

if (str.startsWith(",")) {
// ignore leading comma
str = str.slice(1);
}

if (str === "" || str === "[" || str === ",") {
// beginning of values array or empty string
continue;
}

if (str === "]]") {
// end of values and stream
state = ENDED;
continue;
}

yield JSON.parse(str) as TsonAsyncValueTuple;
}
}
}

assert(state === ENDED, `Stream ended unexpectedly (state ${state})`);
}

export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync {
const instance = createTsonParseAsyncInner(opts);
const instance = createTsonDeserializer(opts);

return (async (iterable, opts) => {
const [result] = await instance(iterable, opts ?? {});
const tsonIterable = stringIterableToTsonIterable(iterable);

return result;
return await instance(tsonIterable, opts ?? {});
}) as TsonParseAsync;
}
3 changes: 1 addition & 2 deletions src/async/handlers/tsonPromise.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
waitError,
} from "../../internals/testUtils.js";
import { createPromise } from "../../internals/testUtils.js";
import { createTsonParseAsyncInner } from "../deserializeAsync.js";
import {
mapIterable,
readableStreamToAsyncIterable,
Expand Down Expand Up @@ -453,7 +452,7 @@ test("does not crash node when it receives a promise rejection", async () => {
};
const stringify = createTsonStringifyAsync(opts);

const parse = createTsonParseAsyncInner(opts);
const parse = createTsonParseAsync(opts);

const original = {
foo: createPromise(() => {
Expand Down
Loading