-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experimental getAsyncCtx()
#776
Changes from all commits
d1b0b54
b09cae8
fecd8b3
982c1b0
493be7c
8238151
104fa51
1da3262
22765b7
c44a9e6
6b7b9a3
bc31746
6982f8a
dee32d2
e8c02dc
1e57bc9
d265244
ec58bbd
bafe07c
82752ca
cf2da79
0d73a20
0796d70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"inngest": patch | ||
--- | ||
|
||
Use `@inngest/test@workspace:^` internally for testing |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@inngest/test": patch | ||
--- | ||
|
||
Altered exports to now be namespaced by `./dist/`; if you have directly imported files from `@inngest/test`, you may need to change the imports |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
--- | ||
"inngest": minor | ||
--- | ||
|
||
Add experimental `getAsyncCtx()`, allowing the retrieval of a run's input (`event`, `step`, `runId`, etc) from the relevant async chain. | ||
|
||
```ts | ||
import { getAsyncCtx } from "inngest/experimental"; | ||
|
||
const ctx = await getAsyncCtx(); | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ module.exports = { | |
roots: ["<rootDir>/src"], | ||
moduleNameMapper: { | ||
"(\\..+)\\.js": "$1", | ||
inngest: "<rootDir>/src", | ||
"^inngest$": "<rootDir>/src", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This regex was picking up anything containing |
||
"^@local$": "<rootDir>/src", | ||
"^@local/(.*)": "<rootDir>/src/$1", | ||
"^@local/(.*)\\.js": "<rootDir>/src/$1", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
], | ||
"exports": { | ||
".": "./src/index.ts", | ||
"./experimental": "./src/experimental.ts", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A new entrypoint to mark out experimental APIs. |
||
"./astro": "./src/astro.ts", | ||
"./bun": "./src/bun.ts", | ||
"./cloudflare": "./src/cloudflare.ts", | ||
|
@@ -37,4 +38,4 @@ | |
"./nitro": "./src/nitro.ts", | ||
"./types": "./src/types.ts" | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import { InngestTestEngine } from "@inngest/test"; | ||
import { type AsyncContext } from "@local/components/execution/als"; | ||
|
||
describe("getAsyncLocalStorage", () => { | ||
const warningSpy = jest.spyOn(console, "warn"); | ||
|
||
afterEach(() => { | ||
jest.unmock("node:async_hooks"); | ||
jest.resetModules(); | ||
}); | ||
|
||
test("should return an `AsyncLocalStorageIsh`", async () => { | ||
const mod = await import("@local/components/execution/als"); | ||
const als = await mod.getAsyncLocalStorage(); | ||
|
||
expect(als).toBeDefined(); | ||
expect(als.getStore).toBeDefined(); | ||
expect(als.run).toBeDefined(); | ||
}); | ||
|
||
test("should return the same instance of `AsyncLocalStorageIsh`", async () => { | ||
const mod = await import("@local/components/execution/als"); | ||
|
||
const als1p = mod.getAsyncLocalStorage(); | ||
const als2p = mod.getAsyncLocalStorage(); | ||
|
||
const als1 = await als1p; | ||
const als2 = await als2p; | ||
|
||
expect(als1).toBe(als2); | ||
}); | ||
|
||
test("should return `undefined` if node:async_hooks is not supported", async () => { | ||
jest.mock("node:async_hooks", () => { | ||
throw new Error("import failed"); | ||
}); | ||
|
||
const mod = await import("@local/components/execution/als"); | ||
const als = await mod.getAsyncLocalStorage(); | ||
|
||
expect(warningSpy).toHaveBeenCalledWith( | ||
expect.stringContaining( | ||
"node:async_hooks is not supported in this runtime" | ||
) | ||
); | ||
|
||
expect(als).toBeDefined(); | ||
expect(als.getStore()).toBeUndefined(); | ||
expect(als.run).toBeDefined(); | ||
}); | ||
}); | ||
|
||
describe("getAsyncCtx", () => { | ||
const wait = async () => { | ||
await new Promise((resolve) => setTimeout(resolve)); | ||
await new Promise((resolve) => process.nextTick(resolve)); | ||
}; | ||
|
||
afterEach(() => { | ||
jest.unmock("node:async_hooks"); | ||
jest.resetModules(); | ||
}); | ||
|
||
test("should return `undefined` outside of an Inngest async context", async () => { | ||
const mod = await import("@local/components/execution/als"); | ||
const store = await mod.getAsyncCtx(); | ||
|
||
expect(store).toBeUndefined(); | ||
}); | ||
|
||
test("should return the input context during execution", async () => { | ||
const { Inngest } = await import("@local"); | ||
const mod = await import("@local/experimental"); | ||
|
||
const inngest = new Inngest({ id: "test" }); | ||
|
||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
let resolve: (value: any) => void | PromiseLike<void>; | ||
const externalP = new Promise<AsyncContext | undefined>((r) => { | ||
resolve = r; | ||
}); | ||
|
||
let internalRunId: string | undefined; | ||
|
||
const fn = inngest.createFunction( | ||
{ id: "test" }, | ||
{ event: "" }, | ||
({ runId }) => { | ||
internalRunId = runId; | ||
|
||
void wait() | ||
.then(() => mod.getAsyncCtx()) | ||
.then(resolve); | ||
|
||
return "done"; | ||
} | ||
); | ||
|
||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any | ||
const t = new InngestTestEngine({ function: fn as any }); | ||
|
||
const { result } = await t.execute(); | ||
|
||
expect(result).toBe("done"); | ||
expect(internalRunId).toBeTruthy(); | ||
|
||
const store = await externalP; | ||
expect(store).toBeDefined(); | ||
expect(store?.ctx.runId).toBe(internalRunId); | ||
}); | ||
|
||
test("should return `undefined` if node:async_hooks is not supported", async () => { | ||
jest.mock("node:async_hooks", () => { | ||
throw new Error("import failed"); | ||
}); | ||
|
||
const { Inngest } = await import("@local"); | ||
const mod = await import("@local/experimental"); | ||
|
||
const inngest = new Inngest({ id: "test" }); | ||
|
||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
let resolve: (value: any) => void | PromiseLike<void>; | ||
const externalP = new Promise<AsyncContext | undefined>((r) => { | ||
resolve = r; | ||
}); | ||
|
||
let internalRunId: string | undefined; | ||
|
||
const fn = inngest.createFunction( | ||
{ id: "test" }, | ||
{ event: "" }, | ||
({ runId }) => { | ||
internalRunId = runId; | ||
|
||
void wait() | ||
.then(() => mod.getAsyncCtx()) | ||
.then(resolve); | ||
|
||
return "done"; | ||
} | ||
); | ||
|
||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any | ||
const t = new InngestTestEngine({ function: fn as any }); | ||
|
||
const { result } = await t.execute(); | ||
|
||
expect(result).toBe("done"); | ||
expect(internalRunId).toBeTruthy(); | ||
|
||
const store = await externalP; | ||
expect(store).toBeUndefined(); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import { type Context } from "../../types.js"; | ||
|
||
export interface AsyncContext { | ||
ctx: Context.Any; | ||
} | ||
Comment on lines
+3
to
+5
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If found, this is the value returned when a developer calls |
||
|
||
/** | ||
* A type that represents a partial, runtime-agnostic interface of | ||
* `AsyncLocalStorage`. | ||
*/ | ||
type AsyncLocalStorageIsh = { | ||
getStore: () => AsyncContext | undefined; | ||
run: <R>(store: AsyncContext, fn: () => R) => R; | ||
}; | ||
|
||
/** | ||
* A local-only variable to store the async local storage instance. | ||
*/ | ||
let als: Promise<AsyncLocalStorageIsh> | undefined; | ||
|
||
/** | ||
* Retrieve the async context for the current execution. | ||
*/ | ||
export const getAsyncCtx = async (): Promise<AsyncContext | undefined> => { | ||
return getAsyncLocalStorage().then((als) => als.getStore()); | ||
}; | ||
Comment on lines
+21
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only piece exposed to the public API, used to retrieve the input to the current function run. |
||
|
||
/** | ||
* Get a singleton instance of `AsyncLocalStorage` used to store and retrieve | ||
* async context for the current execution. | ||
*/ | ||
export const getAsyncLocalStorage = async (): Promise<AsyncLocalStorageIsh> => { | ||
// eslint-disable-next-line @typescript-eslint/no-misused-promises, no-async-promise-executor | ||
als ??= new Promise<AsyncLocalStorageIsh>(async (resolve) => { | ||
try { | ||
const { AsyncLocalStorage } = await import("node:async_hooks"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bun, Deno, and Node all expose this as Cloudflare Workers requires a compatibility flag of In the case that these flags have not been specified, this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! On a side note, how many lines of code would it take you to do this in go!?
|
||
|
||
resolve(new AsyncLocalStorage<AsyncContext>()); | ||
} catch (err) { | ||
console.warn( | ||
"node:async_hooks is not supported in this runtime. Experimental async context is disabled." | ||
); | ||
|
||
resolve({ | ||
getStore: () => undefined, | ||
run: (_, fn) => fn(), | ||
}); | ||
} | ||
}); | ||
|
||
return als; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,7 @@ import { | |
type InngestExecutionOptions, | ||
type MemoizedOp, | ||
} from "./InngestExecution.js"; | ||
import { getAsyncLocalStorage } from "./als.js"; | ||
|
||
export const createV1InngestExecution: InngestExecutionFactory = (options) => { | ||
return new V1InngestExecution(options); | ||
|
@@ -459,44 +460,48 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution { | |
* and middleware hooks where appropriate. | ||
*/ | ||
private async startExecution(): Promise<void> { | ||
/** | ||
* Mutate input as neccessary based on middleware. | ||
*/ | ||
await this.transformInput(); | ||
return getAsyncLocalStorage().then((als) => | ||
als.run({ ctx: this.fnArg }, async (): Promise<void> => { | ||
Comment on lines
462
to
+464
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When executing, we now wrap the entire operation in our |
||
/** | ||
* Mutate input as neccessary based on middleware. | ||
*/ | ||
await this.transformInput(); | ||
|
||
/** | ||
* Start the timer to time out the run if needed. | ||
*/ | ||
void this.timeout?.start(); | ||
/** | ||
* Start the timer to time out the run if needed. | ||
*/ | ||
void this.timeout?.start(); | ||
|
||
await this.state.hooks?.beforeMemoization?.(); | ||
await this.state.hooks?.beforeMemoization?.(); | ||
|
||
/** | ||
* If we had no state to begin with, immediately end the memoization phase. | ||
*/ | ||
if (this.state.allStateUsed()) { | ||
await this.state.hooks?.afterMemoization?.(); | ||
await this.state.hooks?.beforeExecution?.(); | ||
} | ||
/** | ||
* If we had no state to begin with, immediately end the memoization phase. | ||
*/ | ||
if (this.state.allStateUsed()) { | ||
await this.state.hooks?.afterMemoization?.(); | ||
await this.state.hooks?.beforeExecution?.(); | ||
} | ||
|
||
/** | ||
* Trigger the user's function. | ||
*/ | ||
runAsPromise(() => this.userFnToRun(this.fnArg)) | ||
// eslint-disable-next-line @typescript-eslint/no-misused-promises | ||
.finally(async () => { | ||
await this.state.hooks?.afterMemoization?.(); | ||
await this.state.hooks?.beforeExecution?.(); | ||
await this.state.hooks?.afterExecution?.(); | ||
}) | ||
.then((data) => { | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | ||
this.state.setCheckpoint({ type: "function-resolved", data }); | ||
/** | ||
* Trigger the user's function. | ||
*/ | ||
runAsPromise(() => this.userFnToRun(this.fnArg)) | ||
// eslint-disable-next-line @typescript-eslint/no-misused-promises | ||
.finally(async () => { | ||
await this.state.hooks?.afterMemoization?.(); | ||
await this.state.hooks?.beforeExecution?.(); | ||
await this.state.hooks?.afterExecution?.(); | ||
}) | ||
.then((data) => { | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | ||
this.state.setCheckpoint({ type: "function-resolved", data }); | ||
}) | ||
.catch((error) => { | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | ||
this.state.setCheckpoint({ type: "function-rejected", error }); | ||
}); | ||
}) | ||
.catch((error) => { | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | ||
this.state.setCheckpoint({ type: "function-rejected", error }); | ||
}); | ||
); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
export { getAsyncCtx } from "./components/execution/als.js"; | ||
export type { AsyncContext } from "./components/execution/als.js"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inngest
now relies on@inngest/test@workspace:^
, so we make sure that's built during CI too.