Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
feat: serializing async iterable (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT authored Oct 5, 2023
1 parent d82a911 commit 69ff286
Show file tree
Hide file tree
Showing 12 changed files with 933 additions and 235 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

## Introduction

> This package is still experimental (although it's pretty well tested) & is subject to big changes
> ⚠️ This package is still experimental (although it's pretty well tested) & is subject to big changes
A hackable JSON serializer/deserializer that allows you to serialize/deserialize almost[^1] anything.

Expand Down Expand Up @@ -106,6 +106,10 @@ type Obj = typeof obj;
// -> type Obj = { foo: string; set: Set<number>; }
```

### Streaming `Promise`s and `AsyncIterable`s

> See test files called `deserializeAsync.test.ts`
## Extend with a custom serializer

### [Temporal](https://www.npmjs.com/package/@js-temporal/polyfill)
Expand Down
48 changes: 36 additions & 12 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,29 @@ export type TsonAsyncStringifier = <TValue>(
space?: number,
) => TsonAsyncStringifierIterator<TValue>;
export type TsonAsyncIndex = TsonBranded<number, "AsyncRegistered">;
export interface TsonTransformerSerializeDeserializeAsync<TValue> {

export interface TsonTransformerSerializeDeserializeAsync<
TValue,
TSerializedValue,
> {
async: true;
/**
* From JSON-serializable value
*/
deserialize: (
v: TsonAsyncIndex,
register: (index: TsonAsyncIndex) => Promise<TValue>,
) => TValue;
deserialize: (opts: {
/**
* Abort signal from of the full stream
*/
// abortSignal: Promise<never>;
/**
* Notify that we don't expect more values
*/
onDone: () => void;
/**
* Stream of values
*/
stream: AsyncIterable<TSerializedValue>;
}) => TValue;

/**
* The key to use when serialized
Expand All @@ -29,14 +43,20 @@ export interface TsonTransformerSerializeDeserializeAsync<TValue> {
/**
* JSON-serializable value
*/
serialize: (
v: TValue,
register: (thing: TValue) => TsonAsyncIndex,
) => TsonAsyncIndex;
serializeIterator: (opts: {
/**
* Abort signal from of the full stream
*/
// abortSignal?: AbortSignal;
/**
* The value we're serializing
*/
value: TValue;
}) => AsyncIterable<TSerializedValue>;
}

export interface TsonAsyncType<TValue>
extends TsonTransformerSerializeDeserializeAsync<TValue>,
export interface TsonAsyncType<TValue, TSerializedValue>
extends TsonTransformerSerializeDeserializeAsync<TValue, TSerializedValue>,
TsonTypeTesterCustom {}
export interface TsonAsyncOptions {
/**
Expand All @@ -53,5 +73,9 @@ export interface TsonAsyncOptions {
/**
* The list of types to use
*/
types: (TsonAsyncType<any> | TsonType<any, any> | TsonType<any, never>)[];
types: (
| TsonAsyncType<any, any>
| TsonType<any, any>
| TsonType<any, never>
)[];
}
170 changes: 170 additions & 0 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { expect, test, vi } from "vitest";

import {
createTsonAsync,
tsonAsyncIterator,
tsonBigint,
tsonPromise,
} from "../index.js";
import { assert } from "../internals/assert.js";
import {
mapIterable,
readableStreamToAsyncIterable,
} from "../internals/iterableUtils.js";
import { createTestServer } from "../internals/testUtils.js";
import { TsonAsyncOptions } from "./asyncTypes.js";

test("deserialize async iterable", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});

{
// plain obj
const obj = {
foo: "bar",
};

const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);

expect(result).toEqual(obj);
}

{
// promise
const obj = {
foo: Promise.resolve("bar"),
};

const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);

expect(await result.foo).toEqual("bar");
}
});

test("stringify async iterable + promise", async () => {
const onErr = vi.fn();
const tson = createTsonAsync({
nonce: () => "__tson",
onStreamError: onErr,
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});

async function* iterable() {
await new Promise((resolve) => setTimeout(resolve, 1));
yield 1n;
await new Promise((resolve) => setTimeout(resolve, 1));
yield 2n;
yield 3n;

await new Promise((resolve) => setTimeout(resolve, 2));
yield 4n;
yield 5n;
}

const input = {
foo: "bar",
iterable: iterable(),
promise: Promise.resolve(42),
};

const strIterable = tson.stringify(input);

const output = await tson.parse(strIterable);

expect(output.foo).toEqual("bar");

expect(await output.promise).toEqual(42);

const result = [];

for await (const value of output.iterable) {
result.push(value);
}

expect(result).toEqual([1n, 2n, 3n, 4n, 5n]);
});

test("e2e: stringify and parse promise with a promise over a network connection", async () => {
function createMockObj() {
async function* generator() {
for (const number of [1, 2, 3, 4, 5]) {
await new Promise((resolve) => setTimeout(resolve, 1));
yield BigInt(number);
}
}

return {
foo: "bar",
iterable: generator(),
// promise: Promise.resolve(42),
};
}

type MockObj = ReturnType<typeof createMockObj>;

const opts: TsonAsyncOptions = {
nonce: () => "__tson",
types: [tsonPromise, tsonAsyncIterator, tsonBigint],
};

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const strIterarable = tson.stringify(obj, 4);

// set proper header for chunked responses
// res.setHeader("Transfer-Encoding", "chunked");

for await (const value of strIterarable) {
res.write(value);
}

res.end();
},
});

// ------------- client -------------------
const tson = createTsonAsync(opts);

// do a streamed fetch request
const response = await fetch(server.url);

assert(response.body);

const textDecoder = new TextDecoder();

const spy = vi.fn();
const stringIterator = mapIterable(
mapIterable(readableStreamToAsyncIterable(response.body), (v) =>
textDecoder.decode(v),
),
(val) => {
spy(val.trimEnd());
return val;
},
);

const parsedRaw = await tson.parse(stringIterator);
const parsed = parsedRaw as MockObj;

expect(parsed.foo).toEqual("bar");
// expect(await parsed.promise).toEqual(42);

const results = [];

for await (const value of parsed.iterable) {
results.push(value);
}

expect(results).toEqual([1n, 2n, 3n, 4n, 5n]);

server.close();
});
Loading

0 comments on commit 69ff286

Please sign in to comment.