diff --git a/apps/deployment/src/index.ts b/apps/deployment/src/index.ts index c7a6512..3e1097e 100644 --- a/apps/deployment/src/index.ts +++ b/apps/deployment/src/index.ts @@ -95,22 +95,26 @@ export class $DurableObject extends DurableObject { }); const peerIdSet = new Set(); dbPeers.forEach((p) => peerIdSet.add(p.id)); - return unblockedResponse(reader, (message) => { - peers.forEach((peer) => { - if (peerIdSet.has(peer.id)) { - sendToPeer(peer, { - source: "message", - data: { - event: data.event, - from: { - source: "broadcast", + return unblockedResponse( + reader, + (message) => { + peers.forEach((peer) => { + if (peerIdSet.has(peer.id)) { + sendToPeer(peer, { + source: "message", + data: { + event: data.event, + from: { + source: "broadcast", + }, + message, }, - message, - }, - }); - } - }); - }); + }); + } + }); + }, + this.ctx, + ); } case "channel": { const subscriptions = @@ -122,24 +126,28 @@ export class $DurableObject extends DurableObject { ), }); const peers = getPeerMap(); - return unblockedResponse(reader, (message) => { - subscriptions.forEach((sub) => { - const peer = peers.get(sub.peerId); - if (peer) { - sendToPeer(peer, { - source: "message", - data: { - event: data.event, - from: { - source: "channel", - channel: data.channel, + return unblockedResponse( + reader, + (message) => { + subscriptions.forEach((sub) => { + const peer = peers.get(sub.peerId); + if (peer) { + sendToPeer(peer, { + source: "message", + data: { + event: data.event, + from: { + source: "channel", + channel: data.channel, + }, + message, }, - message, - }, - }); - } - }); - }); + }); + } + }); + }, + this.ctx, + ); } case "direct": { const dbPeer = await db.query.peers.findFirst({ @@ -159,18 +167,22 @@ export class $DurableObject extends DurableObject { if (!peer) { return new Response("Not found", { status: 404 }); } - return unblockedResponse(reader, (message) => { - sendToPeer(peer, { - source: "message", - data: { - event: data.event, - from: { - source: "direct", + return unblockedResponse( + reader, + (message) => { + sendToPeer(peer, { + source: "message", + data: { + event: data.event, + from: { + source: "direct", + }, + message, }, - message, - }, - }); - }); + }); + }, + this.ctx, + ); } } } @@ -520,21 +532,37 @@ function transformRequestStream(body: ReadableStream) { async function unblockedResponse( body: ReadableStreamDefaultReader, onMessageChunk: (chunk: unknown) => void | Promise, + ctx: DurableObjectState, ) { - while (true) { - const { done, value } = await body.read(); - if (done) { - break; - } - await onMessageChunk(value); - } - return new Response("OK", { - status: 200, + return new Promise((resolve) => { + const daemonPromise = new Promise((closeDaemon) => { + const rs = new ReadableStream({ + async start(controller) { + while (true) { + const { done, value } = await body.read(); + if (done) { + controller.close(); + closeDaemon(); + break; + } + await onMessageChunk(value); + controller.enqueue("OK"); + } + }, + }); + resolve( + new Response(rs, { + status: 200, + }), + ); + }); + ctx.waitUntil(daemonPromise); }); } export default { async fetch(request, env, ctx): Promise { - return ws.handleUpgrade(request, env, ctx); + const res = await ws.handleUpgrade(request, env, ctx); + return new Response(res.body, res); }, } satisfies ExportedHandler; diff --git a/packages/core/jsr.json b/packages/core/jsr.json index e6a3107..cfb0067 100644 --- a/packages/core/jsr.json +++ b/packages/core/jsr.json @@ -1,7 +1,7 @@ { "name": "@sinkr/core", "license": "MIT", - "version": "0.3.11", + "version": "0.3.12", "exports": { ".": "./src/index.ts", "./client": "./src/index.browser.ts", diff --git a/packages/core/package.json b/packages/core/package.json index c4a924c..be98796 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@sinkr/core", - "version": "0.3.11", + "version": "0.3.12", "type": "module", "main": "src/index.ts", "exports": { diff --git a/packages/core/src/server.ts b/packages/core/src/server.ts index 1ccc14b..92d5435 100644 --- a/packages/core/src/server.ts +++ b/packages/core/src/server.ts @@ -23,12 +23,14 @@ function preludeAndEncodeStream( async start(controller) { controller.enqueue(prelude); try { - const { done, value } = await reader.read(); - if (done) { - controller.close(); - return; + while (true) { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + return; + } + controller.enqueue(value); } - controller.enqueue(value); } catch (e) { controller.error(e); }