Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT committed Oct 13, 2023
1 parent d7c501a commit 19e6dcd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
42 changes: 42 additions & 0 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ import {
} from "../sync/syncTypes.js";
import { TsonStreamInterruptedError } from "./asyncErrors.js";
import {
BrandSerialized,
TsonAsyncIndex,
TsonAsyncOptions,
TsonAsyncStringifierIterable,
TsonAsyncType,
} from "./asyncTypes.js";
import {
createReadableStream,
mapIterable,
readableStreamToAsyncIterable,
} from "./iterableUtils.js";
import { TsonAsyncValueTuple } from "./serializeAsync.js";

type WalkFn = (value: unknown) => unknown;
Expand Down Expand Up @@ -259,3 +265,39 @@ export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync {
return await instance(tsonIterable, opts ?? {});
}) as TsonParseAsync;
}

// export function createTsonParseEventSource(opts: TsonAsyncOptions) {
// const instance = createTsonDeserializer(opts);

// return async <TValue = unknown>(
// url: string,
// parseOpts?: TsonParseAsyncOptions & {
// abortSignal: AbortSignal;
// },
// ) => {
// const [stream, controller] = createReadableStream<string>();
// const eventSource = new EventSource(url);

// const onAbort = () => {
// eventSource.close();
// controller.close();
// parseOpts?.abortSignal.removeEventListener("abort", onAbort);
// };

// parseOpts?.abortSignal.addEventListener("abort", onAbort);

// eventSource.onmessage = (msg) => {
// // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
// controller.enqueue(msg.data);
// };

// const iterable = mapIterable(
// readableStreamToAsyncIterable(stream),
// (msg) => {
// return JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized;
// },
// );

// return (await instance(iterable, parseOpts ?? {})) as TValue;
// };
// }
16 changes: 16 additions & 0 deletions src/async/iterableUtils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { assert } from "../internals/assert";

export async function* readableStreamToAsyncIterable<T>(
stream: ReadableStream<T>,
): AsyncIterable<T> {
Expand Down Expand Up @@ -31,3 +33,17 @@ export async function* mapIterable<T, TValue>(
yield fn(value);
}
}

export function createReadableStream<TValue>() {
let controller: ReadableStreamDefaultController<TValue> =
null as unknown as ReadableStreamDefaultController<TValue>;
const stream = new ReadableStream<TValue>({
start(c) {
controller = c;
},
});

assert(controller, `Could not find controller - this is a bug`);

return [stream, controller] as const;
}
6 changes: 4 additions & 2 deletions src/async/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ test("SSE response test", async () => {
};
}

type MockObj = ReturnType<typeof createMockObj>;
// type MockObj = ReturnType<typeof createMockObj>;

// ------------- server -------------------
const opts = {
Expand Down Expand Up @@ -53,7 +53,7 @@ test("SSE response test", async () => {
});

// ------------- client -------------------
const tson = createTsonAsync(opts);
// const tson = createTsonAsync(opts);

// do a streamed fetch request
const sse = new EventSource(server.url);
Expand Down Expand Up @@ -82,3 +82,5 @@ test("SSE response test", async () => {
]
`);
});

test.todo("parse SSE response");

0 comments on commit 19e6dcd

Please sign in to comment.