Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

feat: deserializeAsync parse takes a ReadableStream Uint8Array as input #39

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { TsonType } from "../types.js";
import { TsonBranded, TsonTypeTesterCustom } from "../types.js";
import { serialized } from "../types.js";

export type TsonAsyncStringifiedStream<TValue> = ReadableStream<Uint8Array> & {
[serialized]: TValue;
};

export type TsonAsyncStringifierIterable<TValue> = AsyncIterable<string> & {
[serialized]: TValue;
};
Expand Down
124 changes: 97 additions & 27 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,44 @@ import {
tsonPromise,
} from "../index.js";
import { assert } from "../internals/assert.js";
import {
mapIterable,
readableStreamToAsyncIterable,
} from "../internals/iterableUtils.js";
import { createTestServer } from "../internals/testUtils.js";
import { createTestServer, createBodyStream } from "../internals/testUtils.js";
import { TsonAsyncOptions } from "./asyncTypes.js";


test("deserialize variable chunk length", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});
{
const iterable = (async function* () {
const body = createBodyStream((async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}';
yield "\n,\n[\n]\n]";
})();
const result = await tson.parse(iterable);
})());
const result = await tson.parse(body);
expect(result).toEqual({ foo: "bar" });
}

{
const iterable = (async function* () {
const body = createBodyStream((async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]';
})();
const result = await tson.parse(iterable);
})());
const result = await tson.parse(body);
expect(result).toEqual({ foo: "bar" });
}

{
const iterable = (async function* () {
const body = createBodyStream((async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json"';
yield ':{"foo":"b';
yield 'ar"},"nonce":"__tson"}\n,\n';
yield "[\n]\n";
yield "]";
})();
const result = await tson.parse(iterable);
})());
const result = await tson.parse(body);
expect(result).toEqual({ foo: "bar" });
}
});
Expand All @@ -65,8 +62,8 @@ test("deserialize async iterable", async () => {
};

const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);
const body = createBodyStream(strIterable);
const result = await tson.parse(body);

expect(result).toEqual(obj);
}
Expand All @@ -78,8 +75,9 @@ test("deserialize async iterable", async () => {
};

const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);
const body = createBodyStream(strIterable);
const resultRaw = await tson.parse(body);
const result = resultRaw as typeof obj;

expect(await result.foo).toEqual("bar");
}
Expand Down Expand Up @@ -112,8 +110,9 @@ test("stringify async iterable + promise", async () => {
};

const strIterable = tson.stringify(input);

const output = await tson.parse(strIterable);
const body = createBodyStream(strIterable);
const outputRaw = await tson.parse(body);
const output = outputRaw as typeof input;

expect(output.foo).toEqual("bar");

Expand Down Expand Up @@ -175,15 +174,86 @@ test("e2e: stringify async iterable and promise over the network", async () => {

assert(response.body);

const textDecoder = new TextDecoder();
const parsedRaw = await tson.parse(response.body);
const parsed = parsedRaw as MockObj;

expect(parsed.foo).toEqual("bar");

const results = [];

for await (const value of parsed.iterable) {
results.push(value);
}

expect(results).toEqual([1n, 2n, 3n, 4n, 5n]);

expect(await parsed.promise).toEqual(42);

await expect(
parsed.rejectedPromise,
).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"');

server.close();
});

test("e2e: server crash", async () => {
function createMockObj() {
async function* generator() {
for (const number of [1n, 2n, 3n, 4n, 5n]) {
await new Promise((resolve) => setTimeout(resolve, 1));
yield number;
}
}

// convert the response body to an async iterable
const stringIterator = mapIterable(
readableStreamToAsyncIterable(response.body),
(v) => textDecoder.decode(v),
);
return {
foo: "bar",
iterable: generator(),
promise: Promise.resolve(42),
rejectedPromise: Promise.reject(new Error("rejected promise")),
};
}

type MockObj = ReturnType<typeof createMockObj>;

// ------------- server -------------------
const opts: TsonAsyncOptions = {
types: [tsonPromise, tsonAsyncIterator, tsonBigint],
};

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const strIterarable = tson.stringify(obj, 4);

let closed = false
setTimeout(() => {
closed = true
server.close();
}, 2)

// res.write(strIterarable);

for await (const value of strIterarable) {
if (closed) continue
res.write(value);
}

res.end();
},
});

// ------------- client -------------------
const tson = createTsonAsync(opts);

// do a streamed fetch request
const response = await fetch(server.url);

assert(response.body);

const parsed = await tson.parse<MockObj>(stringIterator);
const parsedRaw = await tson.parse(response.body);
const parsed = parsedRaw as MockObj;

expect(parsed.foo).toEqual("bar");

Expand Down
21 changes: 15 additions & 6 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import {
TsonAsyncIndex,
TsonAsyncOptions,
TsonAsyncStringifierIterable,
TsonAsyncStringifiedStream,
TsonAsyncType,
} from "./asyncTypes.js";
import { TsonAsyncValueTuple } from "./serializeAsync.js";
Expand All @@ -25,7 +25,7 @@ type AnyTsonTransformerSerializeDeserialize =
| TsonTransformerSerializeDeserialize<any, any>;

type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterable<TValue>,
string: ReadableStream<Uint8Array> | TsonAsyncStringifiedStream<TValue>,
) => Promise<TValue>;

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
Expand All @@ -42,13 +42,22 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
}
}

return async (iterable: AsyncIterable<string>) => {
return async (stream: ReadableStream<Uint8Array>) => {
Copy link
Member

@KATT KATT Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

	return async (stream: ReadableStream<Uint8Array> | AsyncIterable<string> | ReadableStream<Uint8Array>) => {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • we don't care that much about bundle size for this to matter
  • it'd be nicer dx?
  • they can be converted from/to each other seamlessly AFAIK

// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
ReadableStreamDefaultController<unknown>
>();
const iterator = iterable[Symbol.asyncIterator]();

const decoder = new TextDecoder();
const textStream = stream.pipeThrough(
new TransformStream<Uint8Array, string>({
async transform(chunk, controller) {
controller.enqueue(decoder.decode(chunk));
},
}),
);
const reader = textStream.getReader();

const walker: WalkerFactory = (nonce) => {
const walk: WalkFn = (value) => {
Expand Down Expand Up @@ -123,7 +132,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
do {
lines.forEach(readLine);
lines.length = 0;
const nextValue = await iterator.next();
const nextValue = await reader.read();
if (!nextValue.done) {
accumulator += nextValue.value;
const parts = accumulator.split("\n");
Expand All @@ -144,7 +153,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const lines: string[] = [];
do {
const nextValue = await iterator.next();
const nextValue = await reader.read();
if (nextValue.done) {
throw new TsonError("Unexpected end of stream before head");
}
Expand Down
14 changes: 9 additions & 5 deletions src/handlers/tsonPromise.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
createTestServer,
waitError,
waitFor,
createBodyStream,
} from "../internals/testUtils.js";
import { TsonSerialized, TsonType } from "../types.js";

Expand Down Expand Up @@ -354,7 +355,8 @@ test("pipe stringifier to parser", async () => {

const strIterarable = tson.stringify(obj, 4);

const value = await tson.parse(strIterarable);
const valueRaw = await tson.parse(createBodyStream(strIterarable));
const value = valueRaw as typeof obj;

expect(value).toHaveProperty("foo");
expect(await value.foo).toBe("bar");
Expand All @@ -377,7 +379,8 @@ test("stringify and parse promise with a promise", async () => {

const strIterarable = tson.stringify(obj, 4);

const value = await tson.parse(strIterarable);
const valueRaw = await tson.parse(createBodyStream(strIterarable));
const value = valueRaw as typeof obj;

const firstPromise = await value.promise;

Expand Down Expand Up @@ -442,7 +445,7 @@ test("stringify and parse promise with a promise over a network connection", asy
(v) => textDecoder.decode(v),
);

const value = await tson.parse(stringIterator);
const value = await tson.parse(createBodyStream(stringIterator));
const asObj = value as Obj;

const firstPromise = await asObj.promise;
Expand Down Expand Up @@ -481,7 +484,7 @@ test("does not crash node when it receives a promise rejection", async () => {
};
const iterator = stringify(original);

const [_result, deferreds] = await parse(iterator);
const [_result, deferreds] = await parse(createBodyStream(iterator));

const result = _result as typeof original;
await waitFor(() => {
Expand Down Expand Up @@ -549,7 +552,8 @@ test("stringify promise rejection", async () => {
// parse
const iterator = stringify(original, 2);

const result = await parse(iterator);
const resultRaw = await parse(createBodyStream(iterator));
const result = resultRaw as typeof original;

expect(result).toMatchInlineSnapshot(`
{
Expand Down
8 changes: 4 additions & 4 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
createTsonAsync,
tsonPromise,
} from "./index.js";
import { expectError, waitError, waitFor } from "./internals/testUtils.js";
import { expectError, waitError, waitFor, createBodyStream } from "./internals/testUtils.js";
import { TsonSerialized } from "./types.js";

test("multiple handlers for primitive string found", () => {
Expand Down Expand Up @@ -95,7 +95,7 @@ test("async: duplicate keys", async () => {
const gen = generator();
await createTsonAsync({
types: [stringHandler, stringHandler],
}).parse(gen);
}).parse(createBodyStream(gen));
});

expect(err).toMatchInlineSnapshot(
Expand Down Expand Up @@ -136,7 +136,7 @@ test("async: bad init", async () => {
const gen = generator();
await createTsonAsync({
types: [],
}).parse(gen);
}).parse(createBodyStream(gen));
});

expect(err).toMatchInlineSnapshot(
Expand Down Expand Up @@ -171,7 +171,7 @@ test("async: bad values", async () => {
await createTsonAsync({
onStreamError: onErrorSpy,
types: [tsonPromise],
}).parse(generator());
}).parse(createBodyStream(generator()));

await waitFor(() => {
expect(onErrorSpy).toHaveBeenCalledTimes(1);
Expand Down
12 changes: 12 additions & 0 deletions src/internals/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,15 @@ export async function createTestServer(opts: {
url: `http://localhost:${port}`,
};
}

export function createBodyStream(iterator: AsyncIterable<string>) {
return new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
for await (const chunk of iterator) {
controller.enqueue(encoder.encode(chunk));
}
controller.close();
}
})
}