From 5c7909c6c7ebb9dfc5247da5ee28377db95aae4f Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 5 Feb 2025 13:31:33 +1300 Subject: [PATCH] add Promise based apis to Fiber{Handle,Set,Map} modules (#4401) --- .changeset/chatty-terms-occur.md | 5 ++ packages/effect/src/FiberHandle.ts | 59 +++++++++++++++++++++ packages/effect/src/FiberMap.ts | 66 ++++++++++++++++++++++++ packages/effect/src/FiberSet.ts | 58 +++++++++++++++++++++ packages/effect/test/FiberHandle.test.ts | 7 +++ packages/effect/test/FiberMap.test.ts | 7 +++ packages/effect/test/FiberSet.test.ts | 7 +++ 7 files changed, 209 insertions(+) create mode 100644 .changeset/chatty-terms-occur.md diff --git a/.changeset/chatty-terms-occur.md b/.changeset/chatty-terms-occur.md new file mode 100644 index 00000000000..889b36176b4 --- /dev/null +++ b/.changeset/chatty-terms-occur.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add Promise based apis to Fiber{Handle,Set,Map} modules diff --git a/packages/effect/src/FiberHandle.ts b/packages/effect/src/FiberHandle.ts index 3a506f5c5f9..43a6ca48950 100644 --- a/packages/effect/src/FiberHandle.ts +++ b/packages/effect/src/FiberHandle.ts @@ -143,6 +143,25 @@ export const makeRuntime = (): Effect.Effect< (self) => runtime(self)() ) +/** + * Create an Effect run function that is backed by a FiberHandle. + * + * @since 3.13.0 + * @categories constructors + */ +export const makeRuntimePromise = (): Effect.Effect< + ( + effect: Effect.Effect, + options?: Runtime.RunForkOptions | undefined + ) => Promise, + never, + Scope.Scope | R +> => + Effect.flatMap( + make(), + (self) => runtimePromise(self)() + ) + const internalFiberIdId = -1 const internalFiberId = FiberId.make(internalFiberIdId, 0) const isInternalInterruption = Cause.reduceWithContext(undefined, { @@ -436,6 +455,46 @@ export const runtime: ( } ) +/** + * Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberHandle. + * + * The returned run function will return Promise's that will resolve when the + * fiber completes. + * + * @since 3.13.0 + * @categories combinators + */ +export const runtimePromise = (self: FiberHandle): () => Effect.Effect< + ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined + ) => Promise, + never, + R +> => +() => + Effect.map( + runtime(self)(), + (runFork) => + ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined + ): Promise => + new Promise((resolve, reject) => + runFork(effect, options).addObserver((exit) => { + if (Exit.isSuccess(exit)) { + resolve(exit.value) + } else { + reject(Cause.squash(exit.cause)) + } + }) + ) + ) + /** * If any of the Fiber's in the handle terminate with a failure, * the returned Effect will terminate with the first failure that occurred. diff --git a/packages/effect/src/FiberMap.ts b/packages/effect/src/FiberMap.ts index e4ba703c693..1d1ca407dab 100644 --- a/packages/effect/src/FiberMap.ts +++ b/packages/effect/src/FiberMap.ts @@ -158,6 +158,30 @@ export const makeRuntime = (): Effect.Effect< (self) => runtime(self)() ) +/** + * Create an Effect run function that is backed by a FiberMap. + * + * @since 3.13.0 + * @categories constructors + */ +export const makeRuntimePromise = (): Effect.Effect< + ( + key: K, + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined + ) => Promise, + never, + Scope.Scope | R +> => + Effect.flatMap( + make(), + (self) => runtimePromise(self)() + ) + const internalFiberIdId = -1 const internalFiberId = FiberId.make(internalFiberIdId, 0) const isInternalInterruption = Cause.reduceWithContext(undefined, { @@ -539,6 +563,48 @@ export const runtime: ( } ) +/** + * Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberMap. + * + * @since 3.13.0 + * @categories combinators + */ +export const runtimePromise = (self: FiberMap): () => Effect.Effect< + ( + key: K, + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined + } + | undefined + ) => Promise, + never, + R +> => +() => + Effect.map( + runtime(self)(), + (runFork) => + ( + key: K, + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined + ): Promise => + new Promise((resolve, reject) => + runFork(key, effect, options).addObserver((exit) => { + if (Exit.isSuccess(exit)) { + resolve(exit.value) + } else { + reject(Cause.squash(exit.cause)) + } + }) + ) + ) + /** * @since 2.0.0 * @categories combinators diff --git a/packages/effect/src/FiberSet.ts b/packages/effect/src/FiberSet.ts index 754d976544c..6c7419dd8c8 100644 --- a/packages/effect/src/FiberSet.ts +++ b/packages/effect/src/FiberSet.ts @@ -146,6 +146,25 @@ export const makeRuntime = (): Effect.Effec (self) => runtime(self)() ) +/** + * Create an Effect run function that is backed by a FiberSet. + * + * @since 3.13.0 + * @categories constructors + */ +export const makeRuntimePromise = (): Effect.Effect< + ( + effect: Effect.Effect, + options?: Runtime.RunForkOptions | undefined + ) => Promise, + never, + Scope.Scope | R +> => + Effect.flatMap( + make(), + (self) => runtimePromise(self)() + ) + const internalFiberIdId = -1 const internalFiberId = FiberId.make(internalFiberIdId, 0) const isInternalInterruption = Cause.reduceWithContext(undefined, { @@ -375,6 +394,45 @@ export const runtime: ( } ) +/** + * Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberSet. + * + * The returned run function will return Promise's. + * + * @since 3.13.0 + * @categories combinators + */ +export const runtimePromise = (self: FiberSet): () => Effect.Effect< + ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined + ) => Promise, + never, + R +> => +() => + Effect.map( + runtime(self)(), + (runFork) => + ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined + ): Promise => + new Promise((resolve, reject) => + runFork(effect, options).addObserver((exit) => { + if (Exit.isSuccess(exit)) { + resolve(exit.value) + } else { + reject(Cause.squash(exit.cause)) + } + }) + ) + ) + /** * @since 2.0.0 * @categories combinators diff --git a/packages/effect/test/FiberHandle.test.ts b/packages/effect/test/FiberHandle.test.ts index 449b9bf5046..63c2e9d3c7d 100644 --- a/packages/effect/test/FiberHandle.test.ts +++ b/packages/effect/test/FiberHandle.test.ts @@ -111,4 +111,11 @@ describe("FiberHandle", () => { yield* TestClock.adjust(500) assert.isDefined(fiber.unsafePoll()) })) + + it.scoped("makeRuntimePromise", () => + Effect.gen(function*() { + const run = yield* FiberHandle.makeRuntimePromise() + const result = yield* Effect.promise(() => run(Effect.succeed("done"))) + strictEqual(result, "done") + })) }) diff --git a/packages/effect/test/FiberMap.test.ts b/packages/effect/test/FiberMap.test.ts index 2dc3800c57f..5e321acf70d 100644 --- a/packages/effect/test/FiberMap.test.ts +++ b/packages/effect/test/FiberMap.test.ts @@ -136,4 +136,11 @@ describe("FiberMap", () => { yield* TestClock.adjust(500) assert.isDefined(fiber.unsafePoll()) })) + + it.scoped("makeRuntimePromise", () => + Effect.gen(function*() { + const run = yield* FiberMap.makeRuntimePromise() + const result = yield* Effect.promise(() => run("a", Effect.succeed("done"))) + strictEqual(result, "done") + })) }) diff --git a/packages/effect/test/FiberSet.test.ts b/packages/effect/test/FiberSet.test.ts index 3d0e646027b..1861e387981 100644 --- a/packages/effect/test/FiberSet.test.ts +++ b/packages/effect/test/FiberSet.test.ts @@ -108,4 +108,11 @@ describe("FiberSet", () => { yield* TestClock.adjust(500) assert.isDefined(fiber.unsafePoll()) })) + + it.scoped("makeRuntimePromise", () => + Effect.gen(function*() { + const run = yield* FiberSet.makeRuntimePromise() + const result = yield* Effect.promise(() => run(Effect.succeed("done"))) + strictEqual(result, "done") + })) })