Skip to content

Commit

Permalink
Experimental getAsyncCtx() (#776)
Browse files Browse the repository at this point in the history
## Summary
<!-- Succinctly describe your change, providing context, what you've
changed, and why. -->

Adds a new `getAsyncCtx()` function exported from
`"inngest/experimental"` that will attempt to retrieve the function
input arguments.

- Uses `@inngest/test@workspace:^` internally, exclusively for the
`inngest` package. This can be a bit weird, as `inngest` is a peer dep
of `@inngest/test` so typing can be flaky.
- Altered exports of `@inngest/test` to have simpler compilation and
publishing.
- Added the experimental `getAsyncCtx()`

```ts
import { getAsyncCtx } from "inngest/experimental";

const ctx = await getAsyncCtx();
```

### Questions

- [x] Should we remove the store for a particular execution context when
the run completes?
No. It will still be garbage collected and info may still be useful.

## Checklist
<!-- Tick these items off as you progress. -->
<!-- If an item isn't applicable, ideally please strikeout the item by
wrapping it in "~~"" and suffix it with "N/A My reason for skipping
this." -->
<!-- e.g. "- [ ] ~~Added tests~~ N/A Only touches docs" -->

- [ ] ~Added a [docs PR](https://github.com/inngest/website) that
references this PR~ N/A Experimental for now
- [x] Added unit/integration tests
- [x] Added changesets if applicable
  • Loading branch information
jpwilliams authored Dec 12, 2024
1 parent 3ca3403 commit 0dbcc87
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 50 deletions.
5 changes: 5 additions & 0 deletions .changeset/cyan-sheep-train.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

Use `@inngest/test@workspace:^` internally for testing
5 changes: 5 additions & 0 deletions .changeset/dull-students-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@inngest/test": patch
---

Altered exports to now be namespaced by `./dist/`; if you have directly imported files from `@inngest/test`, you may need to change the imports
11 changes: 11 additions & 0 deletions .changeset/silver-dolls-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"inngest": minor
---

Add experimental `getAsyncCtx()`, allowing the retrieval of a run's input (`event`, `step`, `runId`, etc) from the relevant async chain.

```ts
import { getAsyncCtx } from "inngest/experimental";

const ctx = await getAsyncCtx();
```
8 changes: 7 additions & 1 deletion .github/actions/setup-and-build/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ runs:
if: ${{ inputs.install-dependencies == 'true' }}
run: pnpm install
shell: bash
working-directory: ${{ inputs.working-directory }}/packages/inngest
working-directory: ${{ inputs.working-directory }}

- name: Build test dependencies
if: ${{ inputs.install-dependencies == 'true' }}
run: pnpm run build
shell: bash
working-directory: ${{ inputs.working-directory }}/packages/test

- name: Build
if: ${{ inputs.build == 'true' }}
Expand Down
2 changes: 1 addition & 1 deletion packages/inngest/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module.exports = {
roots: ["<rootDir>/src"],
moduleNameMapper: {
"(\\..+)\\.js": "$1",
inngest: "<rootDir>/src",
"^inngest$": "<rootDir>/src",
"^@local$": "<rootDir>/src",
"^@local/(.*)": "<rootDir>/src/$1",
"^@local/(.*)\\.js": "<rootDir>/src/$1",
Expand Down
3 changes: 2 additions & 1 deletion packages/inngest/jsr.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
],
"exports": {
".": "./src/index.ts",
"./experimental": "./src/experimental.ts",
"./astro": "./src/astro.ts",
"./bun": "./src/bun.ts",
"./cloudflare": "./src/cloudflare.ts",
Expand All @@ -37,4 +38,4 @@
"./nitro": "./src/nitro.ts",
"./types": "./src/types.ts"
}
}
}
6 changes: 6 additions & 0 deletions packages/inngest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
"import": "./index.js",
"types": "./index.d.ts"
},
"./experimental": {
"require": "./experimental.js",
"import": "./experimental.js",
"types": "./experimental.d.ts"
},
"./astro": {
"require": "./astro.js",
"import": "./astro.js",
Expand Down Expand Up @@ -199,6 +204,7 @@
"@actions/core": "^1.10.0",
"@actions/exec": "^1.1.1",
"@inngest/eslint-plugin-internal": "workspace:^",
"@inngest/test": "workspace:^",
"@jest/globals": "^29.5.0",
"@shopify/jest-koa-mocks": "^5.1.1",
"@sveltejs/kit": "^1.27.3",
Expand Down
155 changes: 155 additions & 0 deletions packages/inngest/src/components/execution/als.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { InngestTestEngine } from "@inngest/test";
import { type AsyncContext } from "@local/components/execution/als";

describe("getAsyncLocalStorage", () => {
const warningSpy = jest.spyOn(console, "warn");

afterEach(() => {
jest.unmock("node:async_hooks");
jest.resetModules();
});

test("should return an `AsyncLocalStorageIsh`", async () => {
const mod = await import("@local/components/execution/als");
const als = await mod.getAsyncLocalStorage();

expect(als).toBeDefined();
expect(als.getStore).toBeDefined();
expect(als.run).toBeDefined();
});

test("should return the same instance of `AsyncLocalStorageIsh`", async () => {
const mod = await import("@local/components/execution/als");

const als1p = mod.getAsyncLocalStorage();
const als2p = mod.getAsyncLocalStorage();

const als1 = await als1p;
const als2 = await als2p;

expect(als1).toBe(als2);
});

test("should return `undefined` if node:async_hooks is not supported", async () => {
jest.mock("node:async_hooks", () => {
throw new Error("import failed");
});

const mod = await import("@local/components/execution/als");
const als = await mod.getAsyncLocalStorage();

expect(warningSpy).toHaveBeenCalledWith(
expect.stringContaining(
"node:async_hooks is not supported in this runtime"
)
);

expect(als).toBeDefined();
expect(als.getStore()).toBeUndefined();
expect(als.run).toBeDefined();
});
});

describe("getAsyncCtx", () => {
const wait = async () => {
await new Promise((resolve) => setTimeout(resolve));
await new Promise((resolve) => process.nextTick(resolve));
};

afterEach(() => {
jest.unmock("node:async_hooks");
jest.resetModules();
});

test("should return `undefined` outside of an Inngest async context", async () => {
const mod = await import("@local/components/execution/als");
const store = await mod.getAsyncCtx();

expect(store).toBeUndefined();
});

test("should return the input context during execution", async () => {
const { Inngest } = await import("@local");
const mod = await import("@local/experimental");

const inngest = new Inngest({ id: "test" });

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let resolve: (value: any) => void | PromiseLike<void>;
const externalP = new Promise<AsyncContext | undefined>((r) => {
resolve = r;
});

let internalRunId: string | undefined;

const fn = inngest.createFunction(
{ id: "test" },
{ event: "" },
({ runId }) => {
internalRunId = runId;

void wait()
.then(() => mod.getAsyncCtx())
.then(resolve);

return "done";
}
);

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any
const t = new InngestTestEngine({ function: fn as any });

const { result } = await t.execute();

expect(result).toBe("done");
expect(internalRunId).toBeTruthy();

const store = await externalP;
expect(store).toBeDefined();
expect(store?.ctx.runId).toBe(internalRunId);
});

test("should return `undefined` if node:async_hooks is not supported", async () => {
jest.mock("node:async_hooks", () => {
throw new Error("import failed");
});

const { Inngest } = await import("@local");
const mod = await import("@local/experimental");

const inngest = new Inngest({ id: "test" });

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let resolve: (value: any) => void | PromiseLike<void>;
const externalP = new Promise<AsyncContext | undefined>((r) => {
resolve = r;
});

let internalRunId: string | undefined;

const fn = inngest.createFunction(
{ id: "test" },
{ event: "" },
({ runId }) => {
internalRunId = runId;

void wait()
.then(() => mod.getAsyncCtx())
.then(resolve);

return "done";
}
);

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any
const t = new InngestTestEngine({ function: fn as any });

const { result } = await t.execute();

expect(result).toBe("done");
expect(internalRunId).toBeTruthy();

const store = await externalP;
expect(store).toBeUndefined();
});
});
52 changes: 52 additions & 0 deletions packages/inngest/src/components/execution/als.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { type Context } from "../../types.js";

export interface AsyncContext {
ctx: Context.Any;
}

/**
* A type that represents a partial, runtime-agnostic interface of
* `AsyncLocalStorage`.
*/
type AsyncLocalStorageIsh = {
getStore: () => AsyncContext | undefined;
run: <R>(store: AsyncContext, fn: () => R) => R;
};

/**
* A local-only variable to store the async local storage instance.
*/
let als: Promise<AsyncLocalStorageIsh> | undefined;

/**
* Retrieve the async context for the current execution.
*/
export const getAsyncCtx = async (): Promise<AsyncContext | undefined> => {
return getAsyncLocalStorage().then((als) => als.getStore());
};

/**
* Get a singleton instance of `AsyncLocalStorage` used to store and retrieve
* async context for the current execution.
*/
export const getAsyncLocalStorage = async (): Promise<AsyncLocalStorageIsh> => {
// eslint-disable-next-line @typescript-eslint/no-misused-promises, no-async-promise-executor
als ??= new Promise<AsyncLocalStorageIsh>(async (resolve) => {
try {
const { AsyncLocalStorage } = await import("node:async_hooks");

resolve(new AsyncLocalStorage<AsyncContext>());
} catch (err) {
console.warn(
"node:async_hooks is not supported in this runtime. Experimental async context is disabled."
);

resolve({
getStore: () => undefined,
run: (_, fn) => fn(),
});
}
});

return als;
};
71 changes: 38 additions & 33 deletions packages/inngest/src/components/execution/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
type InngestExecutionOptions,
type MemoizedOp,
} from "./InngestExecution.js";
import { getAsyncLocalStorage } from "./als.js";

export const createV1InngestExecution: InngestExecutionFactory = (options) => {
return new V1InngestExecution(options);
Expand Down Expand Up @@ -459,44 +460,48 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution {
* and middleware hooks where appropriate.
*/
private async startExecution(): Promise<void> {
/**
* Mutate input as neccessary based on middleware.
*/
await this.transformInput();
return getAsyncLocalStorage().then((als) =>
als.run({ ctx: this.fnArg }, async (): Promise<void> => {
/**
* Mutate input as neccessary based on middleware.
*/
await this.transformInput();

/**
* Start the timer to time out the run if needed.
*/
void this.timeout?.start();
/**
* Start the timer to time out the run if needed.
*/
void this.timeout?.start();

await this.state.hooks?.beforeMemoization?.();
await this.state.hooks?.beforeMemoization?.();

/**
* If we had no state to begin with, immediately end the memoization phase.
*/
if (this.state.allStateUsed()) {
await this.state.hooks?.afterMemoization?.();
await this.state.hooks?.beforeExecution?.();
}
/**
* If we had no state to begin with, immediately end the memoization phase.
*/
if (this.state.allStateUsed()) {
await this.state.hooks?.afterMemoization?.();
await this.state.hooks?.beforeExecution?.();
}

/**
* Trigger the user's function.
*/
runAsPromise(() => this.userFnToRun(this.fnArg))
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(async () => {
await this.state.hooks?.afterMemoization?.();
await this.state.hooks?.beforeExecution?.();
await this.state.hooks?.afterExecution?.();
})
.then((data) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.state.setCheckpoint({ type: "function-resolved", data });
/**
* Trigger the user's function.
*/
runAsPromise(() => this.userFnToRun(this.fnArg))
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(async () => {
await this.state.hooks?.afterMemoization?.();
await this.state.hooks?.beforeExecution?.();
await this.state.hooks?.afterExecution?.();
})
.then((data) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.state.setCheckpoint({ type: "function-resolved", data });
})
.catch((error) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.state.setCheckpoint({ type: "function-rejected", error });
});
})
.catch((error) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.state.setCheckpoint({ type: "function-rejected", error });
});
);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/inngest/src/experimental.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { getAsyncCtx } from "./components/execution/als.js";
export type { AsyncContext } from "./components/execution/als.js";
2 changes: 1 addition & 1 deletion packages/middleware-validation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
},
"devDependencies": {
"@eslint/js": "^9.7.0",
"@inngest/test": "0.1.1-pr-741.0",
"@inngest/test": "^0.1.3",
"@types/eslint__js": "^8.42.3",
"@types/jest": "^29.5.14",
"eslint": "^8.30.0",
Expand Down
Loading

0 comments on commit 0dbcc87

Please sign in to comment.