Skip to content

Commit

Permalink
add Promise based apis to Fiber{Handle,Set,Map} modules (#4401)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Feb 5, 2025
1 parent c0e5bef commit 2940151
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/chatty-terms-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add Promise based apis to Fiber{Handle,Set,Map} modules
59 changes: 59 additions & 0 deletions packages/effect/src/FiberHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,25 @@ export const makeRuntime = <R, E = unknown, A = unknown>(): Effect.Effect<
(self) => runtime(self)<R>()
)

/**
* Create an Effect run function that is backed by a FiberHandle.
*
* @since 3.13.0
* @categories constructors
*/
export const makeRuntimePromise = <R = never, A = unknown, E = unknown>(): Effect.Effect<
<XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>,
options?: Runtime.RunForkOptions | undefined
) => Promise<XA>,
never,
Scope.Scope | R
> =>
Effect.flatMap(
make<A, E>(),
(self) => runtimePromise(self)<R>()
)

const internalFiberIdId = -1
const internalFiberId = FiberId.make(internalFiberIdId, 0)
const isInternalInterruption = Cause.reduceWithContext(undefined, {
Expand Down Expand Up @@ -436,6 +455,46 @@ export const runtime: <A, E>(
}
)

/**
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberHandle.
*
* The returned run function will return Promise's that will resolve when the
* fiber completes.
*
* @since 3.13.0
* @categories combinators
*/
export const runtimePromise = <A, E>(self: FiberHandle<A, E>): <R = never>() => Effect.Effect<
<XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined }
| undefined
) => Promise<XA>,
never,
R
> =>
<R>() =>
Effect.map(
runtime(self)<R>(),
(runFork) =>
<XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined }
| undefined
): Promise<XA> =>
new Promise((resolve, reject) =>
runFork(effect, options).addObserver((exit) => {
if (Exit.isSuccess(exit)) {
resolve(exit.value)
} else {
reject(Cause.squash(exit.cause))
}
})
)
)

/**
* If any of the Fiber's in the handle terminate with a failure,
* the returned Effect will terminate with the first failure that occurred.
Expand Down
66 changes: 66 additions & 0 deletions packages/effect/src/FiberMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,30 @@ export const makeRuntime = <R, K, E = unknown, A = unknown>(): Effect.Effect<
(self) => runtime(self)<R>()
)

/**
* Create an Effect run function that is backed by a FiberMap.
*
* @since 3.13.0
* @categories constructors
*/
export const makeRuntimePromise = <R, K, A = unknown, E = unknown>(): Effect.Effect<
<XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & {
readonly onlyIfMissing?: boolean | undefined
}
| undefined
) => Promise<XA>,
never,
Scope.Scope | R
> =>
Effect.flatMap(
make<K, A, E>(),
(self) => runtimePromise(self)<R>()
)

const internalFiberIdId = -1
const internalFiberId = FiberId.make(internalFiberIdId, 0)
const isInternalInterruption = Cause.reduceWithContext(undefined, {
Expand Down Expand Up @@ -539,6 +563,48 @@ export const runtime: <K, A, E>(
}
)

/**
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberMap.
*
* @since 3.13.0
* @categories combinators
*/
export const runtimePromise = <K, A, E>(self: FiberMap<K, A, E>): <R = never>() => Effect.Effect<
<XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & {
readonly onlyIfMissing?: boolean | undefined
readonly propagateInterruption?: boolean | undefined
}
| undefined
) => Promise<XA>,
never,
R
> =>
<R>() =>
Effect.map(
runtime(self)<R>(),
(runFork) =>
<XE extends E, XA extends A>(
key: K,
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined }
| undefined
): Promise<XA> =>
new Promise((resolve, reject) =>
runFork(key, effect, options).addObserver((exit) => {
if (Exit.isSuccess(exit)) {
resolve(exit.value)
} else {
reject(Cause.squash(exit.cause))
}
})
)
)

/**
* @since 2.0.0
* @categories combinators
Expand Down
58 changes: 58 additions & 0 deletions packages/effect/src/FiberSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ export const makeRuntime = <R = never, A = unknown, E = unknown>(): Effect.Effec
(self) => runtime(self)<R>()
)

/**
* Create an Effect run function that is backed by a FiberSet.
*
* @since 3.13.0
* @categories constructors
*/
export const makeRuntimePromise = <R = never, A = unknown, E = unknown>(): Effect.Effect<
<XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>,
options?: Runtime.RunForkOptions | undefined
) => Promise<XA>,
never,
Scope.Scope | R
> =>
Effect.flatMap(
make<A, E>(),
(self) => runtimePromise(self)<R>()
)

const internalFiberIdId = -1
const internalFiberId = FiberId.make(internalFiberIdId, 0)
const isInternalInterruption = Cause.reduceWithContext(undefined, {
Expand Down Expand Up @@ -375,6 +394,45 @@ export const runtime: <A, E>(
}
)

/**
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberSet.
*
* The returned run function will return Promise's.
*
* @since 3.13.0
* @categories combinators
*/
export const runtimePromise = <A, E>(self: FiberSet<A, E>): <R = never>() => Effect.Effect<
<XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined }
| undefined
) => Promise<XA>,
never,
R
> =>
<R>() =>
Effect.map(
runtime(self)<R>(),
(runFork) =>
<XE extends E, XA extends A>(
effect: Effect.Effect<XA, XE, R>,
options?:
| Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined }
| undefined
): Promise<XA> =>
new Promise((resolve, reject) =>
runFork(effect, options).addObserver((exit) => {
if (Exit.isSuccess(exit)) {
resolve(exit.value)
} else {
reject(Cause.squash(exit.cause))
}
})
)
)

/**
* @since 2.0.0
* @categories combinators
Expand Down
7 changes: 7 additions & 0 deletions packages/effect/test/FiberHandle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,11 @@ describe("FiberHandle", () => {
yield* TestClock.adjust(500)
assert.isDefined(fiber.unsafePoll())
}))

it.scoped("makeRuntimePromise", () =>
Effect.gen(function*() {
const run = yield* FiberHandle.makeRuntimePromise()
const result = yield* Effect.promise(() => run(Effect.succeed("done")))
strictEqual(result, "done")
}))
})
7 changes: 7 additions & 0 deletions packages/effect/test/FiberMap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,11 @@ describe("FiberMap", () => {
yield* TestClock.adjust(500)
assert.isDefined(fiber.unsafePoll())
}))

it.scoped("makeRuntimePromise", () =>
Effect.gen(function*() {
const run = yield* FiberMap.makeRuntimePromise<never, string>()
const result = yield* Effect.promise(() => run("a", Effect.succeed("done")))
strictEqual(result, "done")
}))
})
7 changes: 7 additions & 0 deletions packages/effect/test/FiberSet.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,11 @@ describe("FiberSet", () => {
yield* TestClock.adjust(500)
assert.isDefined(fiber.unsafePoll())
}))

it.scoped("makeRuntimePromise", () =>
Effect.gen(function*() {
const run = yield* FiberSet.makeRuntimePromise()
const result = yield* Effect.promise(() => run(Effect.succeed("done")))
strictEqual(result, "done")
}))
})

0 comments on commit 2940151

Please sign in to comment.