diff --git a/src/DevTools/Server.ts b/src/DevTools/Server.ts index 096ebff..b8d626c 100644 --- a/src/DevTools/Server.ts +++ b/src/DevTools/Server.ts @@ -65,7 +65,7 @@ export const make = Effect.gen(function*(_) { yield* _(clients.offer(client)) - yield* _( + return yield* _( Stream.fromQueue(responses), Stream.pipeThroughChannel( MsgPack.duplexSchema(Socket.toChannel(socket), { @@ -84,19 +84,13 @@ export const make = Effect.gen(function*(_) { ])) ) }).pipe( - Effect.catchAllCause(Effect.log), - Effect.fork + Effect.catchAllCause(Effect.log) ) - yield* _( - server.sockets.take, - Effect.flatMap(handle), - Effect.forever, - Effect.forkScoped - ) + const run = server.run(handle) return { - run: server.run, + run, clients } satisfies ServerImpl }) diff --git a/src/Socket.ts b/src/Socket.ts index d5b2584..7458536 100644 --- a/src/Socket.ts +++ b/src/Socket.ts @@ -3,13 +3,15 @@ */ import * as Cause from "effect/Cause" import * as Channel from "effect/Channel" -import type * as Chunk from "effect/Chunk" +import * as Chunk from "effect/Chunk" import * as Context from "effect/Context" import * as Data from "effect/Data" +import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" import * as Exit from "effect/Exit" import * as Layer from "effect/Layer" import * as Queue from "effect/Queue" +import * as Runtime from "effect/Runtime" import * as Scope from "effect/Scope" import type * as AsyncProducer from "effect/SingleProducerAsyncInput" import WebSocket from "isomorphic-ws" @@ -40,10 +42,11 @@ export const Socket: Context.Tag = Context.Tag( */ export interface Socket { readonly [SocketTypeId]: SocketTypeId - readonly run: Effect.Effect + readonly run: ( + handler: (_: Uint8Array) => Effect.Effect + ) => Effect.Effect readonly writer: Effect.Effect Effect.Effect> - readonly messages: Queue.Dequeue - readonly source?: unknown + // readonly messages: Queue.Dequeue } /** @@ -95,18 +98,12 @@ export const toChannel = ( } yield* _( - self.run, + self.run((data) => Queue.offer(exitQueue, Exit.succeed(Chunk.of(data)))), Effect.zipRight(Effect.failCause(Cause.empty)), Effect.exit, Effect.tap((exit) => Queue.offer(exitQueue, exit)), Effect.fork ) - yield* _( - Queue.takeBetween(self.messages, 1, Number.MAX_SAFE_INTEGER), - Effect.flatMap((chunk) => Queue.offer(exitQueue, Exit.succeed(chunk))), - Effect.forever, - Effect.fork - ) const loop: Channel.Channel< never, @@ -191,59 +188,69 @@ export const fromWebSocket = ( Effect.gen(function*(_) { const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError const sendQueue = yield* _(Queue.unbounded()) - const messages = yield* _(Queue.unbounded()) - const run = Effect.gen(function*(_) { - const ws = yield* _(acquire) - const encoder = new TextEncoder() + const run = (handler: (_: Uint8Array) => Effect.Effect) => + Effect.gen(function*(_) { + const ws = yield* _(acquire) + const encoder = new TextEncoder() + const runtime = yield* _(Effect.runtime()) + const deferred = yield* _(Deferred.make()) + const run = Runtime.runFork(runtime) - ws.onmessage = (event) => { - Queue.unsafeOffer( - messages, - event.data instanceof Uint8Array - ? event.data - : typeof event.data === "string" - ? encoder.encode(event.data) - : new Uint8Array(event.data) - ) - } + ws.onmessage = (event) => { + run( + Effect.catchAllCause( + handler( + event.data instanceof Uint8Array + ? event.data + : typeof event.data === "string" + ? encoder.encode(event.data) + : new Uint8Array(event.data) + ), + (cause) => Deferred.failCause(deferred, cause) + ) + ) + } - if (ws.readyState !== WebSocket.OPEN) { - yield* _(Effect.async((resume) => { - ws.onopen = () => { - resume(Effect.unit) - } - ws.onerror = (error_) => { - resume(Effect.fail(new SocketError({ reason: "Open", error: (error_ as any).message }))) - } - })) - } + if (ws.readyState !== WebSocket.OPEN) { + yield* _(Effect.async((resume) => { + ws.onopen = () => { + resume(Effect.unit) + } + ws.onerror = (error_) => { + resume(Effect.fail(new SocketError({ reason: "Open", error: (error_ as any).message }))) + } + })) + } - yield* _( - Queue.take(sendQueue), - Effect.tap((chunk) => - Effect.try({ - try: () => ws.send(chunk), - catch: (error) => Effect.fail(new SocketError({ reason: "Write", error: (error as any).message })) - }) - ), - Effect.forever, - Effect.fork - ) + yield* _( + Queue.take(sendQueue), + Effect.tap((chunk) => + Effect.try({ + try: () => ws.send(chunk), + catch: (error) => Effect.fail(new SocketError({ reason: "Write", error: (error as any).message })) + }) + ), + Effect.forever, + Effect.fork + ) - yield* _(Effect.async((resume) => { - ws.onclose = (event) => { - if (closeCodeIsError(event.code)) { - resume(Effect.fail(new SocketError({ reason: "Close", error: event }))) - } else { - resume(Effect.unit) - } - } - ws.onerror = (error) => { - resume(Effect.fail(new SocketError({ reason: "Read", error: (error as any).message }))) - } - })) - }).pipe(Effect.scoped) + yield* _(Effect.race( + Effect.async((resume) => { + ws.onclose = (event) => { + if (closeCodeIsError(event.code)) { + resume(Effect.fail(new SocketError({ reason: "Close", error: event }))) + } else { + resume(Effect.unit) + } + } + ws.onerror = (error) => { + resume(Effect.fail(new SocketError({ reason: "Read", error: (error as any).message }))) + } + }), + Deferred.await(deferred) + )) + }).pipe(Effect.scoped) const write = (chunk: Uint8Array) => Queue.offer(sendQueue, chunk) const writer = Effect.succeed(write) @@ -251,8 +258,7 @@ export const fromWebSocket = ( return Socket.of({ [SocketTypeId]: SocketTypeId, run, - writer, - messages + writer }) }) diff --git a/src/Socket/Node.ts b/src/Socket/Node.ts index cb32af9..8b1dfc3 100644 --- a/src/Socket/Node.ts +++ b/src/Socket/Node.ts @@ -3,10 +3,12 @@ */ import * as Channel from "effect/Channel" import type * as Chunk from "effect/Chunk" +import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" import * as Fiber from "effect/Fiber" import * as Layer from "effect/Layer" import * as Queue from "effect/Queue" +import * as Runtime from "effect/Runtime" import type * as Scope from "effect/Scope" import * as Net from "node:net" import * as Socket from "../Socket.js" @@ -59,41 +61,50 @@ export const fromNetSocket = ( ): Effect.Effect => Effect.gen(function*(_) { const sendQueue = yield* _(Queue.unbounded()) - const messagesQueue = yield* _(Queue.unbounded()) - const run = Effect.gen(function*(_) { - const conn = yield* _(open) - const writeFiber = yield* _( - Queue.take(sendQueue), - Effect.tap((chunk) => - Effect.async((resume) => { - if (chunk === EOF) { - conn.end(() => resume(Effect.unit)) - } else { - conn.write(chunk, (error) => { - resume(error ? Effect.fail(new Socket.SocketError({ reason: "Write", error })) : Effect.unit) - }) - } - }) - ), - Effect.forever, - Effect.fork - ) - conn.on("data", (chunk) => { - Queue.unsafeOffer(messagesQueue, chunk) - }) - yield* _( - Effect.async((resume) => { - conn.on("end", () => { - resume(Effect.unit) - }) - conn.on("error", (error) => { - resume(Effect.fail(new Socket.SocketError({ reason: "Read", error }))) - }) - }), - Effect.race(Fiber.join(writeFiber)) - ) - }).pipe(Effect.scoped) + const run = (handler: (_: Uint8Array) => Effect.Effect) => + Effect.gen(function*(_) { + const conn = yield* _(open) + const runtime = yield* _(Effect.runtime()) + const run = Runtime.runFork(runtime) + const deferred = yield* _(Deferred.make()) + const writeFiber = yield* _( + Queue.take(sendQueue), + Effect.tap((chunk) => + Effect.async((resume) => { + if (chunk === EOF) { + conn.end(() => resume(Effect.unit)) + } else { + conn.write(chunk, (error) => { + resume(error ? Effect.fail(new Socket.SocketError({ reason: "Write", error })) : Effect.unit) + }) + } + }) + ), + Effect.forever, + Effect.fork + ) + conn.on("data", (chunk) => { + run( + Effect.catchAllCause( + handler(chunk), + (cause) => Deferred.failCause(deferred, cause) + ) + ) + }) + yield* _(Effect.raceAll([ + Effect.async((resume) => { + conn.on("end", () => { + resume(Effect.unit) + }) + conn.on("error", (error) => { + resume(Effect.fail(new Socket.SocketError({ reason: "Read", error }))) + }) + }), + Deferred.await(deferred), + Fiber.join(writeFiber) + ])) + }).pipe(Effect.scoped) const write = (chunk: Uint8Array) => Queue.offer(sendQueue, chunk) const writer = Effect.acquireRelease( @@ -104,8 +115,7 @@ export const fromNetSocket = ( return Socket.Socket.of({ [Socket.SocketTypeId]: Socket.SocketTypeId, run, - writer, - messages: messagesQueue + writer }) }) diff --git a/src/SocketServer.ts b/src/SocketServer.ts index 6700361..d354a09 100644 --- a/src/SocketServer.ts +++ b/src/SocketServer.ts @@ -4,7 +4,6 @@ import * as Context from "effect/Context" import * as Data from "effect/Data" import type * as Effect from "effect/Effect" -import type * as Queue from "effect/Queue" import type * as Socket from "./Socket.js" /** @@ -34,8 +33,9 @@ export const SocketServer: Context.Tag = Context.Tag export interface SocketServer { readonly [SocketServerTypeId]: SocketServerTypeId readonly address: Effect.Effect - readonly run: Effect.Effect - readonly sockets: Queue.Dequeue + readonly run: ( + handler: (socket: Socket.Socket) => Effect.Effect + ) => Effect.Effect } /** diff --git a/src/SocketServer/Node.ts b/src/SocketServer/Node.ts index 6c17a0f..50b3570 100644 --- a/src/SocketServer/Node.ts +++ b/src/SocketServer/Node.ts @@ -5,7 +5,7 @@ import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" import { pipe } from "effect/Function" import * as Layer from "effect/Layer" -import * as Queue from "effect/Queue" +import * as Runtime from "effect/Runtime" import type * as Scope from "effect/Scope" import * as Net from "node:net" import * as WS from "ws" @@ -27,57 +27,61 @@ export const make = ( Effect.gen(function*(_) { const fiberId = yield* _(Effect.fiberId) const semaphore = yield* _(Effect.makeSemaphore(1)) - const queue = yield* _(Effect.acquireRelease( - Queue.unbounded(), - Queue.shutdown - )) let serverDeferred = yield* _(Deferred.make()) - const run = Effect.async((resume) => { - const server = Net.createServer(options) - let connected = false - server.on("error", (error) => { - resume(Effect.fail( - new SocketServer.SocketServerError({ - reason: connected ? "Unknown" : "Open", - error - }) - )) - }) - server.on("listening", () => { - connected = true - Deferred.unsafeDone(serverDeferred, Effect.succeed(server)) - }) - server.on("connection", (conn) => { - pipe( - Socket.fromNetSocket( - Effect.acquireRelease( - Effect.succeed(conn), - (conn) => - Effect.sync(() => { - if (conn.closed === false) { - conn.destroySoon() - } - conn.removeAllListeners() + const run = (handler: (socket: Socket.Socket) => Effect.Effect) => + Effect.gen(function*(_) { + const runtime = yield* _(Effect.runtime()) + const run = Runtime.runFork(runtime) + const deferred = yield* _(Deferred.make()) + return yield* _( + Effect.async((resume) => { + const server = Net.createServer(options) + let connected = false + server.on("error", (error) => { + resume(Effect.fail( + new SocketServer.SocketServerError({ + reason: connected ? "Unknown" : "Open", + error }) - ) - ), - Effect.flatMap((socket) => { - ;(socket as any).source = conn - return Queue.offer(queue, socket) + )) + }) + server.on("listening", () => { + connected = true + Deferred.unsafeDone(serverDeferred, Effect.succeed(server)) + }) + server.on("connection", (conn) => { + pipe( + Socket.fromNetSocket( + Effect.acquireRelease( + Effect.succeed(conn), + (conn) => + Effect.sync(() => { + if (conn.closed === false) { + conn.destroySoon() + } + conn.removeAllListeners() + }) + ) + ), + Effect.flatMap(handler), + Effect.catchAllCause((cause) => Deferred.failCause(deferred, cause)), + Effect.scoped, + run + ) + }) + server.listen(options) + return Effect.sync(() => { + serverDeferred = Deferred.unsafeMake(fiberId) + server.removeAllListeners() + server.close() + }) }), - Effect.runFork + Effect.race(Deferred.await(deferred)) ) - }) - server.listen(options) - return Effect.sync(() => { - serverDeferred = Deferred.unsafeMake(fiberId) - server.removeAllListeners() - server.close() - }) - }).pipe( - semaphore.withPermits(1) - ) + }).pipe( + semaphore.withPermits(1) + ) const address = Effect.map( Effect.suspend(() => Deferred.await(serverDeferred)), @@ -99,8 +103,7 @@ export const make = ( return SocketServer.SocketServer.of({ [SocketServer.SocketServerTypeId]: SocketServer.SocketServerTypeId, address, - run, - sockets: queue + run }) }) @@ -126,53 +129,56 @@ export const makeWebSocket = ( Effect.gen(function*(_) { const fiberId = yield* _(Effect.fiberId) const semaphore = yield* _(Effect.makeSemaphore(1)) - const queue = yield* _(Effect.acquireRelease( - Queue.unbounded(), - Queue.shutdown - )) let serverDeferred = yield* _(Deferred.make()) - const run = Effect.async((resume) => { - const server = new WS.WebSocketServer(options) - let connected = false - server.on("error", (error) => { - resume(Effect.fail( - new SocketServer.SocketServerError({ - reason: connected ? "Unknown" : "Open", - error - }) - )) - }) - server.on("listening", () => { - connected = true - Deferred.unsafeDone(serverDeferred, Effect.succeed(server)) - }) - server.on("connection", (conn) => { - pipe( - Socket.fromWebSocket( - Effect.acquireRelease( - Effect.succeed(conn as unknown as globalThis.WebSocket), - (conn) => - Effect.sync(() => { - conn.close() + const run = (handler: (socket: Socket.Socket) => Effect.Effect) => + Effect.gen(function*(_) { + const runtime = yield* _(Effect.runtime()) + const run = Runtime.runFork(runtime) + const deferred = yield* _(Deferred.make()) + return yield* _( + Effect.async((resume) => { + const server = new WS.WebSocketServer(options) + let connected = false + server.on("error", (error) => { + resume(Effect.fail( + new SocketServer.SocketServerError({ + reason: connected ? "Unknown" : "Open", + error }) - ) - ), - Effect.flatMap((socket) => { - ;(socket as any).source = conn - return Queue.offer(queue, socket) + )) + }) + server.on("listening", () => { + connected = true + Deferred.unsafeDone(serverDeferred, Effect.succeed(server)) + }) + server.on("connection", (conn, _req) => { + pipe( + Socket.fromWebSocket( + Effect.acquireRelease( + Effect.succeed(conn as unknown as globalThis.WebSocket), + (conn) => + Effect.sync(() => { + conn.close() + }) + ) + ), + Effect.flatMap(handler), + Effect.catchAllCause((cause) => Deferred.failCause(deferred, cause)), + run + ) + }) + return Effect.sync(() => { + serverDeferred = Deferred.unsafeMake(fiberId) + server.removeAllListeners() + server.close() + }) }), - Effect.runFork + Effect.race(Deferred.await(deferred)) ) - }) - return Effect.sync(() => { - serverDeferred = Deferred.unsafeMake(fiberId) - server.removeAllListeners() - server.close() - }) - }).pipe( - semaphore.withPermits(1) - ) + }).pipe( + semaphore.withPermits(1) + ) const address = Effect.map( Effect.suspend(() => Deferred.await(serverDeferred)), @@ -194,8 +200,7 @@ export const makeWebSocket = ( return SocketServer.SocketServer.of({ [SocketServer.SocketServerTypeId]: SocketServer.SocketServerTypeId, address, - run, - sockets: queue + run }) }) diff --git a/test/Socket.test.ts b/test/Socket.test.ts index 710b8fb..9deaeb6 100644 --- a/test/Socket.test.ts +++ b/test/Socket.test.ts @@ -1,6 +1,6 @@ import * as Socket from "@effect/experimental/Socket/Node" import * as SocketServer from "@effect/experimental/SocketServer/Node" -import { Chunk, Effect, Fiber, Stream } from "effect" +import { Chunk, Effect, Fiber, Queue, Stream } from "effect" import { assert, describe, expect, test } from "vitest" import WS from "vitest-websocket-mock" @@ -8,20 +8,12 @@ const makeServer = Effect.gen(function*(_) { const server = yield* _(SocketServer.make({ port: 0 })) yield* _( - server.sockets.take, - Effect.flatMap((socket) => + server.run((socket) => Effect.gen(function*(_) { const write = yield* _(socket.writer) - yield* _( - socket.messages.take, - Effect.flatMap(write), - Effect.forever, - Effect.fork - ) - yield* _(socket.run) - }).pipe(Effect.scoped, Effect.fork) + yield* _(socket.run(write)) + }).pipe(Effect.scoped) ), - Effect.forever, Effect.forkScoped ) @@ -32,7 +24,6 @@ describe("Socket", () => { test("open", () => Effect.gen(function*(_) { const server = yield* _(makeServer) - yield* _(server.run, Effect.fork) const address = yield* _(server.address) const channel = Socket.makeNetChannel({ port: (address as SocketServer.TcpAddress).port }) @@ -64,7 +55,8 @@ describe("Socket", () => { Effect.gen(function*(_) { const server = yield* _(makeServer) const socket = yield* _(Socket.makeWebSocket(Effect.succeed(url))) - const fiber = yield* _(Effect.fork(socket.run)) + const messages = yield* _(Queue.unbounded()) + const fiber = yield* _(Effect.fork(socket.run((_) => messages.offer(_)))) yield* _( Effect.gen(function*(_) { const write = yield* _(socket.writer) @@ -79,7 +71,7 @@ describe("Socket", () => { })) server.send("Right back at you!") - const message = yield* _(socket.messages.take) + const message = yield* _(messages.take) expect(message).toEqual(new TextEncoder().encode("Right back at you!")) server.close()