From 8ffcaa8a796142aad68e18395dc05af14f6e5aab Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 17:22:01 -0600 Subject: [PATCH 1/7] Added example of opinionated DX --- examples/node/index.js | 53 ++++++++++ examples/node/package.json | 11 ++ examples/node/public/index.html | 180 ++++++++++++++++++++++++++++++++ pnpm-lock.yaml | 6 ++ pnpm-workspace.yaml | 1 + src/adapters/bun.ts | 38 ++++--- src/adapters/deno.ts | 85 +++++++++------ src/adapters/node.ts | 49 ++++----- src/adapters/sse.ts | 110 ++++++++++--------- src/hooks.ts | 85 ++++++++++++++- 10 files changed, 495 insertions(+), 123 deletions(-) create mode 100644 examples/node/index.js create mode 100644 examples/node/package.json create mode 100644 examples/node/public/index.html diff --git a/examples/node/index.js b/examples/node/index.js new file mode 100644 index 0000000..5773ef9 --- /dev/null +++ b/examples/node/index.js @@ -0,0 +1,53 @@ +import { defineHooks } from "crossws"; +import {createServer} from 'node:http' +import crossws from "crossws/adapters/node"; +import { readFileSync } from "node:fs"; + +const ws = crossws({ + hooks: defineHooks({ + upgrade(peer, req) { + // return new Response({"error": "not supported"}, {status: 400}); + + console.log("[ws] upgrade"); + return {headers: {"x-custom-header": "custom-value"}}; + }, + + open(peer) { + // console.log("[ws] open", peer); + peer.send({ user: "server", message: `Welcome ${peer}!` }); + }, + + message(peer, message) { + // console.log("[ws] message", peer, message); + if (message.text().includes("ping")) { + peer.send({ user: "server", message: "pong" }); + } else { + peer.send({ user: peer.toString(), message: message.toString() }); + } + }, + + close(peer, event) { + // console.log("[ws] close", peer, event); + }, + + error(peer, error) { + // console.log("[ws] error", peer, error); + }, + }), +}); + +const hostname = '127.0.0.1'; +const port = 3000; + +const index = readFileSync("./public/index.html"); + +const server = createServer((req, res) => { + res.writeHead(200); + res.end(index); +}) + +server.on("upgrade", ws.handleUpgrade); + +server.listen(port, hostname, () => { + console.log(`Server running at http://${hostname}:${port}/`); +}); diff --git a/examples/node/package.json b/examples/node/package.json new file mode 100644 index 0000000..4266ad4 --- /dev/null +++ b/examples/node/package.json @@ -0,0 +1,11 @@ +{ + "name": "crossws-examples-node", + "version": "0.0.0", + "private": true, + "scripts": { + "dev": "node ./index.js" + }, + "dependencies": { + "crossws": "workspace:*" + } +} diff --git a/examples/node/public/index.html b/examples/node/public/index.html new file mode 100644 index 0000000..fb55a93 --- /dev/null +++ b/examples/node/public/index.html @@ -0,0 +1,180 @@ + + + + + CrossWS Test Page + + + + + + +
+ +
+
+
+

{{ message.user }}

+
+ Avatar +
+

+

{{ message.text }}

+
+
+

{{ message.date }}

+
+
+
+ + +
+
+ +
+
+ + + + +
+
+
+ + + \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5a2f221..918ae07 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -109,6 +109,12 @@ importers: specifier: ^1.7.1 version: 1.9.0 + examples/node: + dependencies: + crossws: + specifier: workspace:* + version: link:../.. + packages: '@ampproject/remapping@2.3.0': diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index c25addb..87c1819 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -1,2 +1,3 @@ packages: - "examples/*" + diff --git a/src/adapters/bun.ts b/src/adapters/bun.ts index 8a76ce8..d151bd7 100644 --- a/src/adapters/bun.ts +++ b/src/adapters/bun.ts @@ -2,7 +2,7 @@ import type { WebSocketHandler, ServerWebSocket, Server } from "bun"; import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { Peer } from "../peer.ts"; @@ -13,7 +13,7 @@ export interface BunAdapter extends AdapterInstance { handleUpgrade(req: Request, server: Server): Promise; } -export interface BunOptions extends AdapterOptions {} +export interface BunOptions extends AdapterOptions { } type ContextData = { peer?: BunPeer; @@ -31,19 +31,29 @@ export default defineWebSocketAdapter( return { ...adapterUtils(peers), async handleUpgrade(request, server) { - const res = await hooks.callHook("upgrade", request); - if (res instanceof Response) { - return res; + let response: Response | undefined; + + /** Accept the Websocket upgrade request. */ + function accept(params?: { headers?: HeadersInit }): void { + if (!server.upgrade(request, { + headers: params?.headers, + data: { + server, + request, + } satisfies ContextData, + })) { + response = new Response("Upgrade failed", { status: 500 }); + }; + } + + /** Reject the Websocket upgrade request */ + function reject(reason: Reasons): void { + response = formatRejection({ reason, type: "Response" }) } - const upgradeOK = server.upgrade(request, { - data: { - server, - request, - } satisfies ContextData, - headers: res?.headers, - }); - if (!upgradeOK) { - return new Response("Upgrade failed", { status: 500 }); + + await hooks.callHook("upgrade", request, { accept, reject }); + if (response instanceof Response) { + return response; } }, websocket: { diff --git a/src/adapters/deno.ts b/src/adapters/deno.ts index 0fd1160..ea6e414 100644 --- a/src/adapters/deno.ts +++ b/src/adapters/deno.ts @@ -1,7 +1,7 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { WSError } from "../error.ts"; import { Peer } from "../peer.ts"; @@ -12,7 +12,7 @@ export interface DenoAdapter extends AdapterInstance { handleUpgrade(req: Request, info: ServeHandlerInfo): Promise; } -export interface DenoOptions extends AdapterOptions {} +export interface DenoOptions extends AdapterOptions { } type WebSocketUpgrade = Deno.WebSocketUpgrade; type ServeHandlerInfo = { @@ -31,36 +31,59 @@ export default defineWebSocketAdapter( return { ...adapterUtils(peers), handleUpgrade: async (request, info) => { - const res = await hooks.callHook("upgrade", request); - if (res instanceof Response) { - return res; + let response: Response | undefined; + + /** Accept the Websocket upgrade request. */ + function accept(params?: { headers?: HeadersInit }): void { + + const upgrade = Deno.upgradeWebSocket(request, { + // @ts-expect-error https://github.com/denoland/deno/pull/22242 + headers, + }); + + const peer = new DenoPeer({ + ws: upgrade.socket, + request, + peers, + denoInfo: info, + }); + + peers.add(peer); + + upgrade.socket.addEventListener("open", () => { + hooks.callHook("open", peer); + }); + + upgrade.socket.addEventListener("message", (event) => { + hooks.callHook("message", peer, new Message(event.data, peer, event)); + }); + + upgrade.socket.addEventListener("close", () => { + peers.delete(peer); + hooks.callHook("close", peer, {}); + }); + + upgrade.socket.addEventListener("error", (error) => { + peers.delete(peer); + hooks.callHook("error", peer, new WSError(error)); + }); + + response = upgrade.response; + } + + /** Reject the Websocket upgrade request */ + function reject(reason: Reasons): void { + response = formatRejection({ reason, type: "Response" }) } - const upgrade = Deno.upgradeWebSocket(request, { - // @ts-expect-error https://github.com/denoland/deno/pull/22242 - headers: res?.headers, - }); - const peer = new DenoPeer({ - ws: upgrade.socket, - request, - peers, - denoInfo: info, - }); - peers.add(peer); - upgrade.socket.addEventListener("open", () => { - hooks.callHook("open", peer); - }); - upgrade.socket.addEventListener("message", (event) => { - hooks.callHook("message", peer, new Message(event.data, peer, event)); - }); - upgrade.socket.addEventListener("close", () => { - peers.delete(peer); - hooks.callHook("close", peer, {}); - }); - upgrade.socket.addEventListener("error", (error) => { - peers.delete(peer); - hooks.callHook("error", peer, new WSError(error)); - }); - return upgrade.response; + + await hooks.callHook("upgrade", request, + { + accept, + reject + } + ); + + return response ?? new Response("Upgrade failed", { status: 500 }); }, }; }, diff --git a/src/adapters/node.ts b/src/adapters/node.ts index 10449d3..e9c39d0 100644 --- a/src/adapters/node.ts +++ b/src/adapters/node.ts @@ -2,7 +2,7 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type { WebSocket } from "../../types/web.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, type Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { WSError } from "../error.ts"; import { Peer } from "../peer.ts"; @@ -86,14 +86,28 @@ export default defineWebSocketAdapter( ...adapterUtils(peers), handleUpgrade: async (nodeReq, socket, head) => { const request = new NodeReqProxy(nodeReq); - const res = await hooks.callHook("upgrade", request); - if (res instanceof Response) { - return sendResponse(socket, res); + + /** Accept the Websocket upgrade request. */ + function accept(): void { + wss.handleUpgrade(nodeReq, socket, head, (ws) => { + wss.emit("connection", ws, nodeReq); + }); + } + + /** Reject the Websocket upgrade request with an optional close code and reason. + * @param code - The close code to send. If not specified, defaults to 1005. + * @param data - The close reason to send. If not specified, defaults to "No reason". + */ + function reject(reason: Reasons): void { + const rejection = formatRejection({ reason, type: "Event" }) + wss.handleUpgrade(nodeReq, socket, head, (ws) => { + ws.close(rejection.code, rejection.data); + }); } - (nodeReq as AugmentedReq)._request = request; - (nodeReq as AugmentedReq)._upgradeHeaders = res?.headers; - wss.handleUpgrade(nodeReq, socket, head, (ws) => { - wss.emit("connection", ws, nodeReq); + + await hooks.callHook("upgrade", request, { + accept, + reject, }); }, closeAll: (code, data) => { @@ -186,22 +200,3 @@ class NodeReqProxy { return this._headers; } } - -async function sendResponse(socket: Duplex, res: Response) { - const head = [ - `HTTP/1.1 ${res.status || 200} ${res.statusText || ""}`, - ...[...res.headers.entries()].map( - ([key, value]) => - `${encodeURIComponent(key)}: ${encodeURIComponent(value)}`, - ), - ]; - socket.write(head.join("\r\n") + "\r\n\r\n"); - if (res.body) { - for await (const chunk of res.body) { - socket.write(chunk); - } - } - return new Promise((resolve) => { - socket.end(resolve); - }); -} diff --git a/src/adapters/sse.ts b/src/adapters/sse.ts index c443f3e..951c272 100644 --- a/src/adapters/sse.ts +++ b/src/adapters/sse.ts @@ -2,7 +2,7 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type * as web from "../../types/web.ts"; import { toString } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { Peer } from "../peer.ts"; @@ -27,63 +27,79 @@ export default defineWebSocketAdapter((opts = {}) => { return { ...adapterUtils(peers), fetch: async (request: Request) => { - const _res = await hooks.callHook("upgrade", request); - if (_res instanceof Response) { - return _res; - } - let peer: SSEPeer; + let _res: Response | undefined; - if (opts.bidir && request.body && request.headers.has("x-crossws-id")) { - // Accept bidirectional streaming request - const id = request.headers.get("x-crossws-id")!; - peer = peersMap?.get(id) as SSEPeer; - if (!peer) { - return new Response("invalid peer id", { status: 400 }); - } - const stream = request.body.pipeThrough(new TextDecoderStream()); - try { - for await (const chunk of stream) { - hooks.callHook("message", peer, new Message(chunk, peer)); + /** Accept the Websocket upgrade request. */ + async function accept(params?: { headers?: HeadersInit }): Promise { + let peer: SSEPeer; + + if (opts.bidir && request.body && request.headers.has("x-crossws-id")) { + // Accept bidirectional streaming request + const id = request.headers.get("x-crossws-id")!; + peer = peersMap?.get(id) as SSEPeer; + if (!peer) { + _res = new Response("invalid peer id", { status: 400 }); + } + const stream = request.body.pipeThrough(new TextDecoderStream()); + try { + for await (const chunk of stream) { + hooks.callHook("message", peer, new Message(chunk, peer)); + } + } catch { + await stream.cancel().catch(() => { }); + } + // eslint-disable-next-line unicorn/no-null + _res = new Response(null, {}); + } else { + // Add a new peer + const ws = new SSEWebSocketStub(); + peer = new SSEPeer({ + peers, + peersMap, + request, + hooks, + ws, + }); + peers.add(peer); + if (opts.bidir) { + peersMap!.set(peer.id, peer); + peer._sendEvent("crossws-id", peer.id); } - } catch { - await stream.cancel().catch(() => {}); } - // eslint-disable-next-line unicorn/no-null - return new Response(null, {}); - } else { - // Add a new peer - const ws = new SSEWebSocketStub(); - peer = new SSEPeer({ - peers, - peersMap, - request, - hooks, - ws, - }); - peers.add(peer); + + let headers: HeadersInit = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }; if (opts.bidir) { - peersMap!.set(peer.id, peer); - peer._sendEvent("crossws-id", peer.id); + headers["x-crossws-id"] = peer.id; } + if (params?.headers) { + headers = new Headers(headers); + for (const [key, value] of new Headers(params.headers)) { + headers.set(key, value); + } + } + + _res = new Response(peer._sseStream, { headers }); } - let headers: HeadersInit = { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - Connection: "keep-alive", - }; - if (opts.bidir) { - headers["x-crossws-id"] = peer.id; + /** Reject the Websocket upgrade request */ + function reject(reason: Reasons): void { + _res = formatRejection({ reason, type: "Response" }) } - if (_res?.headers) { - headers = new Headers(headers); - for (const [key, value] of new Headers(_res.headers)) { - headers.set(key, value); + + + await hooks.callHook("upgrade", request, + { + accept, + reject } - } + ); - return new Response(peer._sseStream, { ..._res, headers }); + return _res ?? new Response("Upgrade failed", { status: 500 }); }, }; }); diff --git a/src/hooks.ts b/src/hooks.ts index dc73f72..d15eea4 100644 --- a/src/hooks.ts +++ b/src/hooks.ts @@ -60,16 +60,93 @@ type HookFn = ( ...args: ArgsT ) => MaybePromise; +export const ReasonMap = { + "BadRequest": { + Response: 400, + Socket: 1002 // Protocol Error + }, + "InvalidData": { + Response: 400, + Socket: 1007 // Invalid Frame Payload Data + }, + "Unauthorized": { + Response: 401, + Socket: 3000 // Unauthorized + }, + "Forbidden": { + Response: 403, + Socket: 1008 // Policy Violation + }, + "PayloadTooLarge": { + Response: 413, + Socket: 1009 // Message Too Big + }, + "UnsupportedMediaType": { + Response: 415, + Socket: 1003 // Unsupported Data + }, + "UpgradeRequired": { + Response: 426, + Socket: 1010 // Mandatory Extension + }, + "BadGateway": { + Response: 502, + Socket: 1014 // Bad Gateway + }, + "ServiceUnavailable": { + Response: 503, + Socket: 1012 // Service Restart + }, + "TryAgainLater": { + Response: 503, + Socket: 1013 // Try Again Later + }, + "TLSHandshake": { + Response: 426, + Socket: 1015 // TLS Handshake + }, + "InternalError": { + Response: 500, + Socket: 1006 // Abnormal Closure + }, + "ServerError": { + Response: 500, + Socket: 1011 // Internal Error + }, +}; + +export type Reasons = keyof typeof ReasonMap +type ResponseTypes = "Response" | "Event" +type RejectionResponses = + T extends "Response" ? Response : + T extends "Event" ? { code: number, data: string } : + undefined + +export function formatRejection({ reason , type }: { reason: Reasons, type: T }): RejectionResponses { + switch (type) { + case "Response": + return new Response(reason, { status: ReasonMap[reason].Response, statusText: reason }) as RejectionResponses + case "Event": + return { code: ReasonMap[reason].Socket, data: reason } as RejectionResponses + default: + return undefined as RejectionResponses + } +} + export interface Hooks { /** Upgrading */ upgrade: ( request: | Request | { - url: string; - headers: Headers; - }, - ) => MaybePromise; + url: string; + headers: Headers; + }, + socket: { + accept: (params?: { headers?: HeadersInit }) => void, + reject: (reason: Reasons) => void, + }, + ) => MaybePromise; /** A message is received */ message: (peer: Peer, message: Message) => MaybePromise; From c7788905cdc7b87b42d156dfbfc14f43857ea552 Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 18:19:14 -0600 Subject: [PATCH 2/7] adjusted bun, updated cloudflare --- examples/node/index.js | 11 +++-- src/adapters/bun.ts | 4 +- src/adapters/cloudflare.ts | 96 ++++++++++++++++++++++---------------- 3 files changed, 62 insertions(+), 49 deletions(-) diff --git a/examples/node/index.js b/examples/node/index.js index 5773ef9..4e290b2 100644 --- a/examples/node/index.js +++ b/examples/node/index.js @@ -5,11 +5,12 @@ import { readFileSync } from "node:fs"; const ws = crossws({ hooks: defineHooks({ - upgrade(peer, req) { - // return new Response({"error": "not supported"}, {status: 400}); - - console.log("[ws] upgrade"); - return {headers: {"x-custom-header": "custom-value"}}; + upgrade(req, socket) { + if(!authorizedCheck(req)){ + socket.reject("Unauthorized") + }else{ + socket.accept() + } }, open(peer) { diff --git a/src/adapters/bun.ts b/src/adapters/bun.ts index d151bd7..641d7e9 100644 --- a/src/adapters/bun.ts +++ b/src/adapters/bun.ts @@ -52,9 +52,7 @@ export default defineWebSocketAdapter( } await hooks.callHook("upgrade", request, { accept, reject }); - if (response instanceof Response) { - return response; - } + return response ?? new Response("Upgrade failed", { status: 500 }); }, websocket: { message: (ws, message) => { diff --git a/src/adapters/cloudflare.ts b/src/adapters/cloudflare.ts index fe4a23e..8c4cfa1 100644 --- a/src/adapters/cloudflare.ts +++ b/src/adapters/cloudflare.ts @@ -1,7 +1,7 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { WSError } from "../error.ts"; import { Peer } from "../peer.ts"; @@ -21,7 +21,7 @@ export interface CloudflareAdapter extends AdapterInstance { ): Promise<_cf.Response>; } -export interface CloudflareOptions extends AdapterOptions {} +export interface CloudflareOptions extends AdapterOptions { } // --- adapter --- @@ -33,48 +33,62 @@ export default defineWebSocketAdapter( return { ...adapterUtils(peers), handleUpgrade: async (request, env, context) => { - const res = await hooks.callHook( + let response + + /** Accept the Websocket upgrade request. */ + function accept(params?: { headers?: HeadersInit }): void { + const pair = new WebSocketPair(); + const client = pair[0]; + const server = pair[1]; + const peer = new CloudflarePeer({ + ws: client, + peers, + wsServer: server, + request: request as unknown as Request, + cfEnv: env, + cfCtx: context, + }); + peers.add(peer); + server.accept(); + hooks.callHook("open", peer); + server.addEventListener("message", (event) => { + hooks.callHook( + "message", + peer, + new Message(event.data, peer, event as MessageEvent), + ); + }); + server.addEventListener("error", (event) => { + peers.delete(peer); + hooks.callHook("error", peer, new WSError(event.error)); + }); + server.addEventListener("close", (event) => { + peers.delete(peer); + hooks.callHook("close", peer, event); + }); + // eslint-disable-next-line unicorn/no-null + response = new Response(null, { + status: 101, + webSocket: client, + headers: params?.headers, + }); + } + + /** Reject the Websocket upgrade request */ + function reject(reason: Reasons): void { + response = formatRejection({ reason, type: "Response" }) + } + + await hooks.callHook( "upgrade", request as unknown as Request, + { + accept, + reject + } ); - if (res instanceof Response) { - return res; - } - const pair = new WebSocketPair(); - const client = pair[0]; - const server = pair[1]; - const peer = new CloudflarePeer({ - ws: client, - peers, - wsServer: server, - request: request as unknown as Request, - cfEnv: env, - cfCtx: context, - }); - peers.add(peer); - server.accept(); - hooks.callHook("open", peer); - server.addEventListener("message", (event) => { - hooks.callHook( - "message", - peer, - new Message(event.data, peer, event as MessageEvent), - ); - }); - server.addEventListener("error", (event) => { - peers.delete(peer); - hooks.callHook("error", peer, new WSError(event.error)); - }); - server.addEventListener("close", (event) => { - peers.delete(peer); - hooks.callHook("close", peer, event); - }); - // eslint-disable-next-line unicorn/no-null - return new Response(null, { - status: 101, - webSocket: client, - headers: res?.headers, - }); + + return response ?? new Response("Upgrade failed", { status: 500 }); }, }; }, From 7fe9c8671b9dfe2536ae6fb44507dd79dca9ca4e Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 18:19:31 -0600 Subject: [PATCH 3/7] formatted --- examples/node/index.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/node/index.js b/examples/node/index.js index 4e290b2..39bec47 100644 --- a/examples/node/index.js +++ b/examples/node/index.js @@ -1,14 +1,14 @@ import { defineHooks } from "crossws"; -import {createServer} from 'node:http' +import { createServer } from 'node:http' import crossws from "crossws/adapters/node"; import { readFileSync } from "node:fs"; const ws = crossws({ hooks: defineHooks({ upgrade(req, socket) { - if(!authorizedCheck(req)){ + if (!authorizedCheck(req)) { socket.reject("Unauthorized") - }else{ + } else { socket.accept() } }, From 5bbda7f40ca6d39289b1ef11b8d8d51ffac9b5ef Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 18:24:33 -0600 Subject: [PATCH 4/7] updated uws --- src/adapters/uws.ts | 78 ++++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index 1b81ea1..89f1b35 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -3,7 +3,7 @@ import type { WebSocket } from "../../types/web.ts"; import type uws from "uWebSockets.js"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { Peer } from "../peer.ts"; @@ -75,17 +75,46 @@ export default defineWebSocketAdapter( res.onAborted(() => { aborted = true; }); - const _res = await hooks.callHook("upgrade", new UWSReqProxy(req)); - if (aborted) { - return; + + /** Accept the Websocket upgrade request. */ + async function accept(params?: { headers?: HeadersInit }): Promise { + if (aborted) { + return; + } + res.writeStatus("101 Switching Protocols"); + if (params?.headers) { + for (const [key, value] of new Headers(params.headers)) { + res.writeHeader(key, value); + } + } + + res.cork(() => { + const key = req.getHeader("sec-websocket-key"); + const protocol = req.getHeader("sec-websocket-protocol"); + const extensions = req.getHeader("sec-websocket-extensions"); + res.upgrade( + { + req, + res, + protocol, + extensions, + }, + key, + protocol, + extensions, + context, + ); + }); } - if (_res instanceof Response) { - res.writeStatus(`${_res.status} ${_res.statusText}`); - for (const [key, value] of _res.headers) { + + async function reject(reason: Reasons): Promise { + const formatedRejection = formatRejection({ reason, type: "Response" }) + res.writeStatus(`${formatedRejection.status} ${formatedRejection.statusText}`); + for (const [key, value] of formatedRejection.headers) { res.writeHeader(key, value); } - if (_res.body) { - for await (const chunk of _res.body) { + if (formatedRejection.body) { + for await (const chunk of formatedRejection.body) { if (aborted) { break; } @@ -97,30 +126,13 @@ export default defineWebSocketAdapter( } return; } - res.writeStatus("101 Switching Protocols"); - if (_res?.headers) { - for (const [key, value] of new Headers(_res.headers)) { - res.writeHeader(key, value); - } - } - res.cork(() => { - const key = req.getHeader("sec-websocket-key"); - const protocol = req.getHeader("sec-websocket-protocol"); - const extensions = req.getHeader("sec-websocket-extensions"); - res.upgrade( - { - req, - res, - protocol, - extensions, - }, - key, - protocol, - extensions, - context, - ); - }); + await hooks.callHook("upgrade", new UWSReqProxy(req), + { + accept, + reject + } + ); }, }, }; @@ -227,7 +239,7 @@ class UWSReqProxy { class UwsWebSocketProxy implements Partial { readyState?: number = 1 /* OPEN */; - constructor(private _uws: uws.WebSocket) {} + constructor(private _uws: uws.WebSocket) { } get bufferedAmount() { return this._uws?.getBufferedAmount(); From 8857cd45d2f9939e1fc78bbaa903aceb8f3a9374 Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 18:26:59 -0600 Subject: [PATCH 5/7] updated durable --- src/adapters/cloudflare-durable.ts | 53 +++++++++++++++++++----------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/src/adapters/cloudflare-durable.ts b/src/adapters/cloudflare-durable.ts index 526fa78..9d554b1 100644 --- a/src/adapters/cloudflare-durable.ts +++ b/src/adapters/cloudflare-durable.ts @@ -2,7 +2,7 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type * as web from "../../types/web.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; -import { AdapterHookable } from "../hooks.ts"; +import { AdapterHookable, formatRejection, Reasons } from "../hooks.ts"; import { Message } from "../message.ts"; import { Peer } from "../peer.ts"; @@ -31,27 +31,40 @@ export default defineWebSocketAdapter< // placeholder }, handleDurableUpgrade: async (obj, request) => { - const res = await hooks.callHook("upgrade", request as Request); - if (res instanceof Response) { - return res; + let response: Response | undefined; + + async function accept(params?: { headers?: HeadersInit }): Promise { + const pair = new WebSocketPair(); + const client = pair[0]; + const server = pair[1]; + const peer = CloudflareDurablePeer._restore( + obj, + server as unknown as CF.WebSocket, + request, + ); + peers.add(peer); + (obj as DurableObjectPub).ctx.acceptWebSocket(server); + await hooks.callHook("open", peer); + // eslint-disable-next-line unicorn/no-null + response = new Response(null, { + status: 101, + webSocket: client, + headers: params?.headers, + }); } - const pair = new WebSocketPair(); - const client = pair[0]; - const server = pair[1]; - const peer = CloudflareDurablePeer._restore( - obj, - server as unknown as CF.WebSocket, - request, + + function reject(reason: Reasons): void { + response = formatRejection({ reason, type: "Response" }) + } + + await hooks.callHook("upgrade", request as Request, + { + accept, + reject + } ); - peers.add(peer); - (obj as DurableObjectPub).ctx.acceptWebSocket(server); - await hooks.callHook("open", peer); - // eslint-disable-next-line unicorn/no-null - return new Response(null, { - status: 101, - webSocket: client, - headers: res?.headers, - }); + + return response ?? new Response("Upgrade failed", { status: 500 }); }, handleDurableMessage: async (obj, ws, message) => { const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket); From 78b88c6004545148c6ecf2c4306952f375e21965 Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 18:27:59 -0600 Subject: [PATCH 6/7] adjusted type --- src/hooks.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hooks.ts b/src/hooks.ts index d15eea4..0803a85 100644 --- a/src/hooks.ts +++ b/src/hooks.ts @@ -34,7 +34,7 @@ export class AdapterHookable { ([globalRes, hook]) => { const hookResPromise = hook?.(arg1 as any, arg2 as any); return hookResPromise instanceof Promise - ? hookResPromise.then((hookRes) => hookRes || globalRes) + ? hookResPromise.then((hookRes) => hookRes ?? globalRes) : hookResPromise || globalRes; }, ) as Promise; From 9dfc4a306d6fba57ee25fb5227d9191f48739848 Mon Sep 17 00:00:00 2001 From: Luke Hagar Date: Mon, 11 Nov 2024 18:51:13 -0600 Subject: [PATCH 7/7] testing things --- examples/node/index.js | 10 ++++++---- examples/node/package.json | 1 + src/hooks.ts | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/node/index.js b/examples/node/index.js index 39bec47..8602bc8 100644 --- a/examples/node/index.js +++ b/examples/node/index.js @@ -6,11 +6,12 @@ import { readFileSync } from "node:fs"; const ws = crossws({ hooks: defineHooks({ upgrade(req, socket) { - if (!authorizedCheck(req)) { + // if (!authorizedCheck(req)) { socket.reject("Unauthorized") - } else { - socket.accept() - } + // } else { + // socket.accept() + + // } }, open(peer) { @@ -43,6 +44,7 @@ const port = 3000; const index = readFileSync("./public/index.html"); const server = createServer((req, res) => { + console.log('hit'); res.writeHead(200); res.end(index); }) diff --git a/examples/node/package.json b/examples/node/package.json index 4266ad4..fc8bb76 100644 --- a/examples/node/package.json +++ b/examples/node/package.json @@ -2,6 +2,7 @@ "name": "crossws-examples-node", "version": "0.0.0", "private": true, + "type": "module", "scripts": { "dev": "node ./index.js" }, diff --git a/src/hooks.ts b/src/hooks.ts index 0803a85..e41bb6e 100644 --- a/src/hooks.ts +++ b/src/hooks.ts @@ -34,7 +34,7 @@ export class AdapterHookable { ([globalRes, hook]) => { const hookResPromise = hook?.(arg1 as any, arg2 as any); return hookResPromise instanceof Promise - ? hookResPromise.then((hookRes) => hookRes ?? globalRes) + ? hookResPromise.then((hookRes) => hookRes || globalRes) : hookResPromise || globalRes; }, ) as Promise; @@ -146,7 +146,7 @@ export interface Hooks { accept: (params?: { headers?: HeadersInit }) => void, reject: (reason: Reasons) => void, }, - ) => MaybePromise; + ) => MaybePromise; /** A message is received */ message: (peer: Peer, message: Message) => MaybePromise;