Skip to content

Commit

Permalink
Fix stream iteration not being present
Browse files Browse the repository at this point in the history
  • Loading branch information
Wundero committed Dec 7, 2024
1 parent a71ef8b commit 266c074
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 60 deletions.
134 changes: 81 additions & 53 deletions apps/deployment/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,26 @@ export class $DurableObject extends DurableObject {
});
const peerIdSet = new Set();
dbPeers.forEach((p) => peerIdSet.add(p.id));
return unblockedResponse(reader, (message) => {
peers.forEach((peer) => {
if (peerIdSet.has(peer.id)) {
sendToPeer(peer, {
source: "message",
data: {
event: data.event,
from: {
source: "broadcast",
return unblockedResponse(
reader,
(message) => {
peers.forEach((peer) => {
if (peerIdSet.has(peer.id)) {
sendToPeer(peer, {
source: "message",
data: {
event: data.event,
from: {
source: "broadcast",
},
message,
},
message,
},
});
}
});
});
});
}
});
},
this.ctx,
);
}
case "channel": {
const subscriptions =
Expand All @@ -122,24 +126,28 @@ export class $DurableObject extends DurableObject {
),
});
const peers = getPeerMap();
return unblockedResponse(reader, (message) => {
subscriptions.forEach((sub) => {
const peer = peers.get(sub.peerId);
if (peer) {
sendToPeer(peer, {
source: "message",
data: {
event: data.event,
from: {
source: "channel",
channel: data.channel,
return unblockedResponse(
reader,
(message) => {
subscriptions.forEach((sub) => {
const peer = peers.get(sub.peerId);
if (peer) {
sendToPeer(peer, {
source: "message",
data: {
event: data.event,
from: {
source: "channel",
channel: data.channel,
},
message,
},
message,
},
});
}
});
});
});
}
});
},
this.ctx,
);
}
case "direct": {
const dbPeer = await db.query.peers.findFirst({
Expand All @@ -159,18 +167,22 @@ export class $DurableObject extends DurableObject {
if (!peer) {
return new Response("Not found", { status: 404 });
}
return unblockedResponse(reader, (message) => {
sendToPeer(peer, {
source: "message",
data: {
event: data.event,
from: {
source: "direct",
return unblockedResponse(
reader,
(message) => {
sendToPeer(peer, {
source: "message",
data: {
event: data.event,
from: {
source: "direct",
},
message,
},
message,
},
});
});
});
},
this.ctx,
);
}
}
}
Expand Down Expand Up @@ -520,21 +532,37 @@ function transformRequestStream(body: ReadableStream) {
async function unblockedResponse(
body: ReadableStreamDefaultReader<unknown>,
onMessageChunk: (chunk: unknown) => void | Promise<void>,
ctx: DurableObjectState,
) {
while (true) {
const { done, value } = await body.read();
if (done) {
break;
}
await onMessageChunk(value);
}
return new Response("OK", {
status: 200,
return new Promise<Response>((resolve) => {
const daemonPromise = new Promise<void>((closeDaemon) => {
const rs = new ReadableStream({
async start(controller) {
while (true) {
const { done, value } = await body.read();
if (done) {
controller.close();
closeDaemon();
break;
}
await onMessageChunk(value);
controller.enqueue("OK");
}
},
});
resolve(
new Response(rs, {
status: 200,
}),
);
});
ctx.waitUntil(daemonPromise);
});
}

export default {
async fetch(request, env, ctx): Promise<Response> {
return ws.handleUpgrade(request, env, ctx);
const res = await ws.handleUpgrade(request, env, ctx);
return new Response(res.body, res);
},
} satisfies ExportedHandler<Env>;
2 changes: 1 addition & 1 deletion packages/core/jsr.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@sinkr/core",
"license": "MIT",
"version": "0.3.11",
"version": "0.3.12",
"exports": {
".": "./src/index.ts",
"./client": "./src/index.browser.ts",
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@sinkr/core",
"version": "0.3.11",
"version": "0.3.12",
"type": "module",
"main": "src/index.ts",
"exports": {
Expand Down
12 changes: 7 additions & 5 deletions packages/core/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ function preludeAndEncodeStream(
async start(controller) {
controller.enqueue(prelude);
try {
const { done, value } = await reader.read();
if (done) {
controller.close();
return;
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.close();
return;
}
controller.enqueue(value);
}
controller.enqueue(value);
} catch (e) {
controller.error(e);
}
Expand Down

0 comments on commit 266c074

Please sign in to comment.