From ea4f5916e3f345db24e1fb5b1aec71c9d01e5f97 Mon Sep 17 00:00:00 2001 From: Steve Cresswell <229672+cressie176@users.noreply.github.com> Date: Tue, 9 Jan 2024 22:46:39 +0000 Subject: [PATCH] Add hook failed event --- .gitignore | 1 + .husky/pre-commit | 1 + README.md | 21 ++++++-- examples/javascript/index.js | 13 +++-- examples/typescript/index.ts | 14 ++++-- examples/typescript/tsconfig.json | 26 ++++------ index.d.ts | 4 +- index.js | 18 ++++--- .../005.create-fby-notification-mechanism.sql | 2 +- package.json | 2 +- test/notifications.test.js | 48 ++++++++++++++----- 11 files changed, 100 insertions(+), 50 deletions(-) diff --git a/.gitignore b/.gitignore index 76b1021..dbd40f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .nyc_output coverage node_modules +examples/typescript/dist diff --git a/.husky/pre-commit b/.husky/pre-commit index 40acb84..223234f 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,3 +1,4 @@ #!/usr/bin/env sh . "$(dirname -- "$0")/_/husky.sh" +rm -rf examples/typescript/dist && pushd examples/typescript && npx tsc && popd && rm -rf examples/typescript/dist npm run lint-staged && npm test diff --git a/README.md b/README.md index 1323743..92d5efe 100644 --- a/README.md +++ b/README.md @@ -214,8 +214,23 @@ Starts polling the database for notifications #### filby.stopNotifications(): Promise<void> Stops polling the database for notifications, and waits for any inflight notifications to complete. -#### filby.on(event: string, callback: (data: Event) => Promise): Filby|Listener -Filby extends [eventemitter2](https://www.npmjs.com/package/eventemitter2) which unlike node's EventEmitter, supports asynchronous events. You can use these to listen for change notifications and perform of asynchronous tasks like making an HTTP request for a webhook. If the task throws an exception it will be caught by Filby and the notifiation retried up to a maximum number of times, with an incremental backoff delay. +#### filby.on(event: symbol|string, callback: (...values: any[]) => void): Filby|Listener +Filby extends [eventemitter2](https://www.npmjs.com/package/eventemitter2) which unlike node's EventEmitter, supports asynchronous events. You can use these to listen for change notifications and perform of asynchronous tasks like making an HTTP request for a webhook. The event name is user defined and must be specified in the Hook [Data Definition](#data-definition). The sole callback parameter is the Notification context (see the TypeScript definitions), e.g. + +```js +filby.on('VAT Rate chnaged', async (event) => { + await axios.post('https://httpbin.org/status/200', event); +}); +``` + +If your event handler throws an exception it will be caught by Filby and the notifiation retried up to a maximum number of times, with an incremental backoff delay. If the maximum attempts are exceeded then Filby emits dedicated event, `Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED`. The first parameter to the callback is the Error object, and the second the Notification context, e.g. + +```js +filby.on(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, (err, context) => { + console.error('Hook Failed', context); + console.error('Hook Failed', err.stack); // Careful not to log secrets that may be on the error +}); +``` #### filby.getProjections(): Promise<Projection[]> Returns the list of projections. @@ -373,7 +388,7 @@ add change set: ## Configuration ```js { - // All the database configuration is passed through to https://www.npmjs.com/package/pg + // All the database configuration is passed through to https://www.npmjs.com/package/pg "database": { "user": "fby_example", "database": "fby_example", diff --git a/examples/javascript/index.js b/examples/javascript/index.js index c318e0c..1f3aa9b 100644 --- a/examples/javascript/index.js +++ b/examples/javascript/index.js @@ -1,3 +1,4 @@ +/* eslint-disable no-console */ const path = require('node:path'); const Fastify = require('fastify'); @@ -52,12 +53,16 @@ const filby = new Filby({ ...config.filby, ...{ database: config.database } }); await fastify.listen(config.server); - filby.on('park_v1_change', async (event) => { - await axios.post('https://httpbin.org/status/200', event); + filby.on(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (err, context) => { + console.error('Hook failed', context); + console.error(err.stack); }); - filby.on('change', async (event) => { + filby.on('park_v1_change', async (context) => { + await axios.post('https://httpbin.org/status/200', context); + }); + filby.on('change', async (context) => { // Demonstrate a webhook with retry behaviour - await axios.post('https://httpbin.org/status/500', event); + await axios.post('https://httpbin.org/status/500', context); }); await filby.startNotifications(); diff --git a/examples/typescript/index.ts b/examples/typescript/index.ts index c4b4b1d..e3b4efc 100644 --- a/examples/typescript/index.ts +++ b/examples/typescript/index.ts @@ -7,7 +7,7 @@ import axios from 'axios'; import config from './config.json'; import changeLogRoute from './routes/changelog-v1'; -import Filby, { Projection, Event } from '../..'; +import Filby, { Projection, Notification } from '../..'; const fastify = Fastify(config.fastify); @@ -58,12 +58,16 @@ const app: AppProcess = process; await fastify.listen(config.server); - filby.on('park_v1_change', async (event: Event) => { - await axios.post('https://httpbin.org/status/200', event); + filby.on(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (err, context) => { + console.error(`Hook failed`, context); + console.error(err.stack); }); - filby.on('change', async (event: Event) => { + filby.on('park_v1_change', async (context: Notification) => { + await axios.post('https://httpbin.org/status/200', context); + }); + filby.on('change', async (context: Notification) => { // Demonstrate a webhook with retry behaviour - await axios.post('https://httpbin.org/status/500', event); + await axios.post('https://httpbin.org/status/500', context); }); await filby.startNotifications(); diff --git a/examples/typescript/tsconfig.json b/examples/typescript/tsconfig.json index afa29e7..bebb55e 100644 --- a/examples/typescript/tsconfig.json +++ b/examples/typescript/tsconfig.json @@ -1,7 +1,6 @@ { "compilerOptions": { /* Visit https://aka.ms/tsconfig to read more about this file */ - /* Projects */ // "incremental": true, /* Save .tsbuildinfo files to allow for incremental compilation of projects. */ // "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */ @@ -9,9 +8,8 @@ // "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */ // "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */ // "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */ - /* Language and Environment */ - "target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ + "target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ // "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */ // "jsx": "preserve", /* Specify what JSX code is generated. */ // "experimentalDecorators": true, /* Enable experimental support for legacy experimental decorators. */ @@ -23,9 +21,8 @@ // "noLib": true, /* Disable including any library files, including the default lib.d.ts. */ // "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */ // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ - /* Modules */ - "module": "commonjs", /* Specify what module code is generated. */ + "module": "commonjs", /* Specify what module code is generated. */ // "rootDir": "./", /* Specify the root folder within your source files. */ // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ @@ -39,15 +36,13 @@ // "resolvePackageJsonExports": true, /* Use the package.json 'exports' field when resolving package imports. */ // "resolvePackageJsonImports": true, /* Use the package.json 'imports' field when resolving imports. */ // "customConditions": [], /* Conditions to set in addition to the resolver-specific defaults when resolving imports. */ - "resolveJsonModule": true, /* Enable importing .json files. */ + "resolveJsonModule": true, /* Enable importing .json files. */ // "allowArbitraryExtensions": true, /* Enable importing files with any extension, provided a declaration file is present. */ // "noResolve": true, /* Disallow 'import's, 'require's or ''s from expanding the number of files TypeScript should add to a project. */ - /* JavaScript Support */ // "allowJs": true, /* Allow JavaScript files to be a part of your program. Use the 'checkJS' option to get errors from these files. */ // "checkJs": true, /* Enable error reporting in type-checked JavaScript files. */ // "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */ - /* Emit */ // "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */ // "declarationMap": true, /* Create sourcemaps for d.ts files. */ @@ -55,7 +50,7 @@ // "sourceMap": true, /* Create source map files for emitted JavaScript files. */ // "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */ // "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */ - // "outDir": "./", /* Specify an output folder for all emitted files. */ + "outDir": "./dist", /* Specify an output folder for all emitted files. */ // "removeComments": true, /* Disable emitting comments. */ // "noEmit": true, /* Disable emitting files from a compilation. */ // "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */ @@ -72,17 +67,15 @@ // "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */ // "declarationDir": "./", /* Specify the output directory for generated declaration files. */ // "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */ - /* Interop Constraints */ // "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */ // "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */ // "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */ - "esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */ + "esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */ // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ - "forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */ - + "forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */ /* Type Checking */ - "strict": true, /* Enable all strict type-checking options. */ + "strict": true, /* Enable all strict type-checking options. */ // "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */ // "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */ // "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */ @@ -101,9 +94,8 @@ // "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type. */ // "allowUnusedLabels": true, /* Disable error reporting for unused labels. */ // "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */ - /* Completeness */ // "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ - "skipLibCheck": true /* Skip type checking all .d.ts files. */ + "skipLibCheck": true /* Skip type checking all .d.ts files. */ } -} +} \ No newline at end of file diff --git a/index.d.ts b/index.d.ts index e588714..a73b2f3 100644 --- a/index.d.ts +++ b/index.d.ts @@ -2,6 +2,7 @@ import { EventEmitter2 as EventEmitter } from 'eventemitter2'; import { PoolClient, PoolConfig } from 'pg'; export default class Filby extends EventEmitter { + static HOOK_MAX_ATTEMPTS_EXHAUSTED: string; constructor(config: Config); init(): Promise; startNotifications(): Promise; @@ -39,8 +40,9 @@ export type ChangeSet = { entityTag: string; }; -export type Event = { +export type Notification = { event: string; + attempts: number; } & Projection; export type Entity = { diff --git a/index.js b/index.js index f9e82d1..0fcb772 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,8 @@ module.exports = class Filby extends EventEmitter { #pool; #scheduler; + static HOOK_MAX_ATTEMPTS_EXHAUSTED = 'HOOK_ATTEMPTS_EXHAUSTED'; + constructor(config) { super(); this.#config = config; @@ -111,12 +113,13 @@ module.exports = class Filby extends EventEmitter { const notification = await this.#getNextNotification(tx, maxAttempts); if (!notification) return false; + const context = await this.#getNotificationContext(tx, notification); try { - const hook = await this.#getHook(tx, notification); - await this.emitAsync(hook.event, hook); + await this.emitAsync(context.event, context); await this.#passNotification(tx, notification); } catch (err) { await this.#failNotification(tx, notification, err); + if (notification.attempts >= maxAttempts) this.emitAsync(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, err, context); } return true; @@ -135,16 +138,19 @@ module.exports = class Filby extends EventEmitter { return notifications[0]; } - async #getHook(tx, notification) { + async #getNotificationContext(tx, notification) { const { rows } = await tx.query( - `SELECT h.event, p.id, p.name, p.version FROM fby_hook h + `SELECT h.event, p.id, p.name AS projection, p.version FROM fby_hook h INNER JOIN fby_notification n ON n.hook_id = h.id INNER JOIN fby_projection p ON p.id = n.projection_id WHERE h.id = $1`, [notification.hookId], ); - const hooks = rows.map((row) => ({ event: row.event, projection: { name: row.name, version: row.version } })); - return hooks[0]; + return rows.map((row) => ({ + event: row.event, + projection: { name: row.projection, version: row.version }, + attempts: notification.attempts, + }))[0]; } async #passNotification(tx, notification) { diff --git a/migrations/005.create-fby-notification-mechanism.sql b/migrations/005.create-fby-notification-mechanism.sql index b556087..5fcda0d 100644 --- a/migrations/005.create-fby-notification-mechanism.sql +++ b/migrations/005.create-fby-notification-mechanism.sql @@ -60,7 +60,7 @@ BEGIN SELECT n.id, n.hook_id, - n.attempts + n.attempts + 1 AS attempts FROM fby_notification n WHERE n.status = 'PENDING' diff --git a/package.json b/package.json index 0d49f11..fbd4da6 100644 --- a/package.json +++ b/package.json @@ -65,4 +65,4 @@ "url": "https://github.com/acuminous/filby/issues" }, "homepage": "https://acuminous.github.io/filby" -} +} \ No newline at end of file diff --git a/test/notifications.test.js b/test/notifications.test.js index f27a3be..75f0a98 100644 --- a/test/notifications.test.js +++ b/test/notifications.test.js @@ -1,9 +1,5 @@ -const { - ok, strictEqual: eq, deepEqual: deq, rejects, match, -} = require('node:assert'); -const { - describe, it, before, beforeEach, after, afterEach, -} = require('zunit'); +const { ok, strictEqual: eq, deepEqual: deq, rejects, match } = require('node:assert'); +const { describe, it, before, beforeEach, after, afterEach } = require('zunit'); const TestFilby = require('./TestFilby'); @@ -59,9 +55,10 @@ describe('Notifications', () => { (1, 1, now())`); }); - filby.once('VAT Rate Changed', ({ event, projection }) => { - eq(event, 'VAT Rate Changed'); - deq(projection, { name: 'VAT Rates', version: 1 }); + filby.once('VAT Rate Changed', (context) => { + eq(context.event, 'VAT Rate Changed'); + deq(context.projection, { name: 'VAT Rates', version: 1 }); + eq(context.attempts, 1); done(); }); @@ -80,9 +77,9 @@ describe('Notifications', () => { (1, 1, now())`); }); - filby.on('VAT Rate Changed', ({ event, projection }) => { - eq(event, 'VAT Rate Changed'); - deq(projection, { name: 'VAT Rates', version: 1 }); + let attempts = 0; + filby.on('VAT Rate Changed', () => { + eq(++attempts, 1); setTimeout(done, 1000); }); @@ -147,4 +144,31 @@ describe('Notifications', () => { filby.startNotifications(); }); + + it('should emit an event when the last notification attempt fails', async (t, done) => { + await filby.withTransaction(async (tx) => { + await tx.query(`INSERT INTO fby_projection (id, name, version) VALUES + (1, 'VAT Rates', 1), + (2, 'CGT Rates', 1)`); + await tx.query(`INSERT INTO fby_hook (id, projection_id, event) VALUES + (1, 1, 'VAT Rate Changed'), + (2, 2, 'CGT Rate Changed')`); + await tx.query(`INSERT INTO fby_notification (hook_id, projection_id, scheduled_for) VALUES + (1, 1, now())`); + }); + + filby.on('VAT Rate Changed', () => { + throw new Error('Oh Noes!'); + }); + + filby.on(TestFilby.HOOK_MAX_ATTEMPTS_EXHAUSTED, (err, context) => { + eq(err.message, 'Oh Noes!'); + eq(context.event, 'VAT Rate Changed'); + deq(context.projection, { name: 'VAT Rates', version: 1 }); + eq(context.attempts, 3); + setTimeout(done, 1000); + }); + + filby.startNotifications(); + }); });