From 7ca9537e11a370b2b8b37ce57ec7d9892c911eac Mon Sep 17 00:00:00 2001 From: Jack Williams Date: Mon, 21 Oct 2024 17:16:15 +0100 Subject: [PATCH] Expose `EventSchemas` in `Inngest` instances (#657) ## Summary Exposes "runtime schemas" on an `Inngest` client, allowing middleware to use it to add custom validation using the schemas passed to `EventSchemas`. We'll add this as a top-level option later, but this allows us to explore the functionality while we settle on APIs, default functionality, and supporting many validation libraries. As an example, here is a custom middleware that makes use of this to add runtime validation to any Zod schemas that exist within your client's `EventSchemas`. It assumes you have `zod` installed. ```sh # Use this PR to test npm install inngest@pr-657 ``` ```ts export const inngest = new Inngest({ id: "my-app", schemas, middleware: [experimentalValidationMiddleware()], }); ``` ```ts import { InngestMiddleware, internalEvents, type EventPayload, type InngestFunction, } from "inngest"; import { z, ZodType } from "zod"; /** * Experimental middleware that validates events using Zod schemas passed using * `EventSchemas.fromZod()`. */ export const experimentalValidationMiddleware = (opts?: { /** * Disallow events that don't have a schema defined. */ disallowSchemalessEvents?: boolean; /** * Disallow events that have a schema defined, but the schema is unknown and * not handled in this code. */ disallowUnknownSchemas?: boolean; /** * Disable validation of incoming events. */ disableIncomingValidation?: boolean; /** * Disable validation of outgoing events using `inngest.send()` or * `step.sendEvent()`. */ disableOutgoingValidation?: boolean; }) => { const mw = new InngestMiddleware({ name: "Inngest Experimental: Runtime schema validation", init({ client }) { /** * Given an `event`, validate it against its schema. */ const validateEvent = async ( event: EventPayload, potentialInvokeEvents: string[] = [] ): Promise => { let schemasToAttempt = new Set([event.name]); let hasSchema = false; /** * Trust internal events; don't allow overwriting their typing. */ if (event.name.startsWith("inngest/")) { if (event.name !== internalEvents.FunctionInvoked) { return event; } /** * If this is an `inngest/function.invoked` event, try validating the * payload against one of the function's schemas. */ schemasToAttempt = new Set(potentialInvokeEvents); hasSchema = Boolean( schemasToAttempt.intersection( new Set( Object.keys(client["schemas"]?.["runtimeSchemas"] || {}) ) ).size ); } else { hasSchema = Boolean( client["schemas"]?.["runtimeSchemas"][event.name] ); } if (!hasSchema) { if (opts?.disallowSchemalessEvents) { throw new Error( `Event "${event.name}" has no schema defined; disallowing` ); } return event; } const errors: Record = {}; for (const schemaName of schemasToAttempt) { try { const schema = client["schemas"]?.["runtimeSchemas"][schemaName]; /** * The schema could be a full Zod object. */ if (helpers.isZodObject(schema)) { const { success, data, error } = await schema .passthrough() .safeParseAsync(event); if (success) { return data as unknown as EventPayload; } throw new Error(`${error.name}: ${error.message}`); } /** * The schema could also be a regular object with Zod objects inside. */ if (helpers.isObject(schema)) { // It could be a partial schema; validate each field return await Object.keys(schema).reduce>( async (acc, key) => { const fieldSchema = schema[key]; const eventField = event[key as keyof EventPayload]; if (!helpers.isZodObject(fieldSchema) || !eventField) { return acc; } const { success, data, error } = await fieldSchema .passthrough() .safeParseAsync(eventField); if (success) { return { ...(await acc), [key]: data }; } throw new Error(`${error.name}: ${error.message}`); }, Promise.resolve({ ...event }) ); } /** * Didn't find anything? Throw or warn. * * We only allow this for assessing single schemas, as otherwise we're * assessing an invocation would could be multiple. */ if (opts?.disallowUnknownSchemas && schemasToAttempt.size === 1) { throw new Error( `Event "${event.name}" has an unknown schema; disallowing` ); } else { console.warn( "Unknown schema found; cannot validate, but allowing" ); } } catch (err) { errors[schemaName] = err as Error; } } if (Object.keys(errors).length) { throw new Error( `Event "${event.name}" failed validation:\n\n${Object.keys(errors) .map((key) => `Using ${key}: ${errors[key].message}`) .join("\n\n")}` ); } return event; }; return { ...(opts?.disableIncomingValidation ? {} : { async onFunctionRun({ fn }) { const backupEvents = ( (fn.opts as InngestFunction.Options).triggers || [] ).reduce((acc, trigger) => { if (trigger.event) { return [...acc, trigger.event]; } return acc; }, []); return { async transformInput({ ctx: { events } }) { const validatedEvents = await Promise.all( events.map((event) => { return validateEvent(event, backupEvents); }) ); return { ctx: { event: validatedEvents[0], events: validatedEvents, } as {}, }; }, }; }, }), ...(opts?.disableOutgoingValidation ? {} : { async onSendEvent() { return { async transformInput({ payloads }) { return { payloads: await Promise.all( payloads.map((payload) => { return validateEvent(payload); }) ), }; }, }; }, }), }; }, }); return mw; }; const helpers = { isZodObject: (value: unknown): value is z.ZodObject => { return value instanceof ZodType && value._def.typeName === "ZodObject"; }, isObject: (value: unknown): value is Record => { return typeof value === "object" && value !== null && !Array.isArray(value); }, }; ``` ## Checklist - [ ] ~Added a [docs PR](https://github.com/inngest/website) that references this PR~ N/A - [ ] ~Added unit/integration tests~ N/A - [x] Added changesets if applicable ## Related - Partially addresses #410 --- .changeset/mean-brooms-pull.md | 5 ++++ .../inngest/src/components/EventSchemas.ts | 30 ++++++++++++++++++- packages/inngest/src/components/Inngest.ts | 4 +++ 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 .changeset/mean-brooms-pull.md diff --git a/.changeset/mean-brooms-pull.md b/.changeset/mean-brooms-pull.md new file mode 100644 index 000000000..ef1a7a1d8 --- /dev/null +++ b/.changeset/mean-brooms-pull.md @@ -0,0 +1,5 @@ +--- +"inngest": patch +--- + +Expose `EventSchemas` in `Inngest` instances diff --git a/packages/inngest/src/components/EventSchemas.ts b/packages/inngest/src/components/EventSchemas.ts index 031b210a6..8ccee7015 100644 --- a/packages/inngest/src/components/EventSchemas.ts +++ b/packages/inngest/src/components/EventSchemas.ts @@ -248,6 +248,15 @@ export class EventSchemas< [internalEvents.ScheduledTimer]: ScheduledTimerEventPayload; }>, > { + protected runtimeSchemas: Record = {}; + + private addRuntimeSchemas(schemas: Record) { + this.runtimeSchemas = { + ...this.runtimeSchemas, + ...schemas, + }; + } + /** * Use generated Inngest types to type events. */ @@ -343,7 +352,6 @@ export class EventSchemas< * ``` */ public fromZod( - // eslint-disable-next-line @typescript-eslint/no-unused-vars schemas: T ): EventSchemas< Combine< @@ -353,6 +361,26 @@ export class EventSchemas< > > > { + let runtimeSchemas: Record; + + if (Array.isArray(schemas)) { + runtimeSchemas = schemas.reduce((acc, schema) => { + const { + name: { value: name }, + ...rest + } = schema.shape; + + return { + ...acc, + [name]: rest, + }; + }, {}); + } else { + runtimeSchemas = schemas; + } + + this.addRuntimeSchemas(runtimeSchemas); + return this; } } diff --git a/packages/inngest/src/components/Inngest.ts b/packages/inngest/src/components/Inngest.ts index f365f96d8..54907ac1c 100644 --- a/packages/inngest/src/components/Inngest.ts +++ b/packages/inngest/src/components/Inngest.ts @@ -151,6 +151,8 @@ export class Inngest { */ private _mode!: Mode; + protected readonly schemas?: NonNullable; + get apiBaseUrl(): string | undefined { return this._apiBaseUrl; } @@ -192,6 +194,7 @@ export class Inngest { logger = new DefaultLogger(), middleware, isDev, + schemas, } = this.options; if (!id) { @@ -216,6 +219,7 @@ export class Inngest { mode: this.mode, }); + this.schemas = schemas; this.loadModeEnvVars(); this.logger = logger;