Skip to content

Commit

Permalink
Add hook failed event
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Jan 9, 2024
1 parent baf8ac5 commit ea4f591
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.nyc_output
coverage
node_modules
examples/typescript/dist
1 change: 1 addition & 0 deletions .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env sh
. "$(dirname -- "$0")/_/husky.sh"
rm -rf examples/typescript/dist && pushd examples/typescript && npx tsc && popd && rm -rf examples/typescript/dist
npm run lint-staged && npm test
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,23 @@ Starts polling the database for notifications
#### filby.stopNotifications(): Promise<void>
Stops polling the database for notifications, and waits for any inflight notifications to complete.

#### filby.on(event: string, callback: (data: Event) => Promise<any>): Filby|Listener
Filby extends [eventemitter2](https://www.npmjs.com/package/eventemitter2) which unlike node's EventEmitter, supports asynchronous events. You can use these to listen for change notifications and perform of asynchronous tasks like making an HTTP request for a webhook. If the task throws an exception it will be caught by Filby and the notifiation retried up to a maximum number of times, with an incremental backoff delay.
#### filby.on(event: symbol|string, callback: (...values: any[]) => void): Filby|Listener
Filby extends [eventemitter2](https://www.npmjs.com/package/eventemitter2) which unlike node's EventEmitter, supports asynchronous events. You can use these to listen for change notifications and perform of asynchronous tasks like making an HTTP request for a webhook. The event name is user defined and must be specified in the Hook [Data Definition](#data-definition). The sole callback parameter is the Notification context (see the TypeScript definitions), e.g.

```js
filby.on('VAT Rate chnaged', async (event) => {
await axios.post('https://httpbin.org/status/200', event);
});
```

If your event handler throws an exception it will be caught by Filby and the notifiation retried up to a maximum number of times, with an incremental backoff delay. If the maximum attempts are exceeded then Filby emits dedicated event, `Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED`. The first parameter to the callback is the Error object, and the second the Notification context, e.g.

```js
filby.on(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, (err, context) => {
console.error('Hook Failed', context);
console.error('Hook Failed', err.stack); // Careful not to log secrets that may be on the error
});
```

#### filby.getProjections(): Promise&lt;Projection[]&gt;
Returns the list of projections.
Expand Down Expand Up @@ -373,7 +388,7 @@ add change set:
## Configuration
```js
{
// All the database configuration is passed through to https://www.npmjs.com/package/pg
// All the database configuration is passed through to https://www.npmjs.com/package/pg
"database": {
"user": "fby_example",
"database": "fby_example",
Expand Down
13 changes: 9 additions & 4 deletions examples/javascript/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
const path = require('node:path');

const Fastify = require('fastify');
Expand Down Expand Up @@ -52,12 +53,16 @@ const filby = new Filby({ ...config.filby, ...{ database: config.database } });

await fastify.listen(config.server);

filby.on('park_v1_change', async (event) => {
await axios.post('https://httpbin.org/status/200', event);
filby.on(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (err, context) => {
console.error('Hook failed', context);
console.error(err.stack);
});
filby.on('change', async (event) => {
filby.on('park_v1_change', async (context) => {
await axios.post('https://httpbin.org/status/200', context);
});
filby.on('change', async (context) => {
// Demonstrate a webhook with retry behaviour
await axios.post('https://httpbin.org/status/500', event);
await axios.post('https://httpbin.org/status/500', context);
});
await filby.startNotifications();

Expand Down
14 changes: 9 additions & 5 deletions examples/typescript/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import axios from 'axios';

import config from './config.json';
import changeLogRoute from './routes/changelog-v1';
import Filby, { Projection, Event } from '../..';
import Filby, { Projection, Notification } from '../..';

const fastify = Fastify(config.fastify);

Expand Down Expand Up @@ -58,12 +58,16 @@ const app: AppProcess = process;

await fastify.listen(config.server);

filby.on('park_v1_change', async (event: Event) => {
await axios.post('https://httpbin.org/status/200', event);
filby.on(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (err, context) => {
console.error(`Hook failed`, context);
console.error(err.stack);
});
filby.on('change', async (event: Event) => {
filby.on('park_v1_change', async (context: Notification) => {
await axios.post('https://httpbin.org/status/200', context);
});
filby.on('change', async (context: Notification) => {
// Demonstrate a webhook with retry behaviour
await axios.post('https://httpbin.org/status/500', event);
await axios.post('https://httpbin.org/status/500', context);
});
await filby.startNotifications();

Expand Down
26 changes: 9 additions & 17 deletions examples/typescript/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
{
"compilerOptions": {
/* Visit https://aka.ms/tsconfig to read more about this file */

/* Projects */
// "incremental": true, /* Save .tsbuildinfo files to allow for incremental compilation of projects. */
// "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */
// "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */
// "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */
// "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */
// "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */

/* Language and Environment */
"target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
"target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
// "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */
// "jsx": "preserve", /* Specify what JSX code is generated. */
// "experimentalDecorators": true, /* Enable experimental support for legacy experimental decorators. */
Expand All @@ -23,9 +21,8 @@
// "noLib": true, /* Disable including any library files, including the default lib.d.ts. */
// "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */
// "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */

/* Modules */
"module": "commonjs", /* Specify what module code is generated. */
"module": "commonjs", /* Specify what module code is generated. */
// "rootDir": "./", /* Specify the root folder within your source files. */
// "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
Expand All @@ -39,23 +36,21 @@
// "resolvePackageJsonExports": true, /* Use the package.json 'exports' field when resolving package imports. */
// "resolvePackageJsonImports": true, /* Use the package.json 'imports' field when resolving imports. */
// "customConditions": [], /* Conditions to set in addition to the resolver-specific defaults when resolving imports. */
"resolveJsonModule": true, /* Enable importing .json files. */
"resolveJsonModule": true, /* Enable importing .json files. */
// "allowArbitraryExtensions": true, /* Enable importing files with any extension, provided a declaration file is present. */
// "noResolve": true, /* Disallow 'import's, 'require's or '<reference>'s from expanding the number of files TypeScript should add to a project. */

/* JavaScript Support */
// "allowJs": true, /* Allow JavaScript files to be a part of your program. Use the 'checkJS' option to get errors from these files. */
// "checkJs": true, /* Enable error reporting in type-checked JavaScript files. */
// "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */

/* Emit */
// "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */
// "declarationMap": true, /* Create sourcemaps for d.ts files. */
// "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */
// "sourceMap": true, /* Create source map files for emitted JavaScript files. */
// "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */
// "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */
// "outDir": "./", /* Specify an output folder for all emitted files. */
"outDir": "./dist", /* Specify an output folder for all emitted files. */
// "removeComments": true, /* Disable emitting comments. */
// "noEmit": true, /* Disable emitting files from a compilation. */
// "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */
Expand All @@ -72,17 +67,15 @@
// "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */
// "declarationDir": "./", /* Specify the output directory for generated declaration files. */
// "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */

/* Interop Constraints */
// "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */
// "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */
// "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */
"esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
"esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
// "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */
"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */

"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */
/* Type Checking */
"strict": true, /* Enable all strict type-checking options. */
"strict": true, /* Enable all strict type-checking options. */
// "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */
// "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */
// "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */
Expand All @@ -101,9 +94,8 @@
// "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type. */
// "allowUnusedLabels": true, /* Disable error reporting for unused labels. */
// "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */

/* Completeness */
// "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
}
}
}
4 changes: 3 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { EventEmitter2 as EventEmitter } from 'eventemitter2';
import { PoolClient, PoolConfig } from 'pg';

export default class Filby extends EventEmitter {
static HOOK_MAX_ATTEMPTS_EXHAUSTED: string;
constructor(config: Config);
init(): Promise<void>;
startNotifications(): Promise<void>;
Expand Down Expand Up @@ -39,8 +40,9 @@ export type ChangeSet = {
entityTag: string;
};

export type Event = {
export type Notification = {
event: string;
attempts: number;
} & Projection;

export type Entity = {
Expand Down
18 changes: 12 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module.exports = class Filby extends EventEmitter {
#pool;
#scheduler;

static HOOK_MAX_ATTEMPTS_EXHAUSTED = 'HOOK_ATTEMPTS_EXHAUSTED';

constructor(config) {
super();
this.#config = config;
Expand Down Expand Up @@ -111,12 +113,13 @@ module.exports = class Filby extends EventEmitter {
const notification = await this.#getNextNotification(tx, maxAttempts);
if (!notification) return false;

const context = await this.#getNotificationContext(tx, notification);
try {
const hook = await this.#getHook(tx, notification);
await this.emitAsync(hook.event, hook);
await this.emitAsync(context.event, context);
await this.#passNotification(tx, notification);
} catch (err) {
await this.#failNotification(tx, notification, err);
if (notification.attempts >= maxAttempts) this.emitAsync(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, err, context);
}

return true;
Expand All @@ -135,16 +138,19 @@ module.exports = class Filby extends EventEmitter {
return notifications[0];
}

async #getHook(tx, notification) {
async #getNotificationContext(tx, notification) {
const { rows } = await tx.query(
`SELECT h.event, p.id, p.name, p.version FROM fby_hook h
`SELECT h.event, p.id, p.name AS projection, p.version FROM fby_hook h
INNER JOIN fby_notification n ON n.hook_id = h.id
INNER JOIN fby_projection p ON p.id = n.projection_id
WHERE h.id = $1`,
[notification.hookId],
);
const hooks = rows.map((row) => ({ event: row.event, projection: { name: row.name, version: row.version } }));
return hooks[0];
return rows.map((row) => ({
event: row.event,
projection: { name: row.projection, version: row.version },
attempts: notification.attempts,
}))[0];
}

async #passNotification(tx, notification) {
Expand Down
2 changes: 1 addition & 1 deletion migrations/005.create-fby-notification-mechanism.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ BEGIN
SELECT
n.id,
n.hook_id,
n.attempts
n.attempts + 1 AS attempts
FROM
fby_notification n
WHERE n.status = 'PENDING'
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@
"url": "https://github.com/acuminous/filby/issues"
},
"homepage": "https://acuminous.github.io/filby"
}
}
48 changes: 36 additions & 12 deletions test/notifications.test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
const {
ok, strictEqual: eq, deepEqual: deq, rejects, match,
} = require('node:assert');
const {
describe, it, before, beforeEach, after, afterEach,
} = require('zunit');
const { ok, strictEqual: eq, deepEqual: deq, rejects, match } = require('node:assert');
const { describe, it, before, beforeEach, after, afterEach } = require('zunit');

const TestFilby = require('./TestFilby');

Expand Down Expand Up @@ -59,9 +55,10 @@ describe('Notifications', () => {
(1, 1, now())`);
});

filby.once('VAT Rate Changed', ({ event, projection }) => {
eq(event, 'VAT Rate Changed');
deq(projection, { name: 'VAT Rates', version: 1 });
filby.once('VAT Rate Changed', (context) => {
eq(context.event, 'VAT Rate Changed');
deq(context.projection, { name: 'VAT Rates', version: 1 });
eq(context.attempts, 1);
done();
});

Expand All @@ -80,9 +77,9 @@ describe('Notifications', () => {
(1, 1, now())`);
});

filby.on('VAT Rate Changed', ({ event, projection }) => {
eq(event, 'VAT Rate Changed');
deq(projection, { name: 'VAT Rates', version: 1 });
let attempts = 0;
filby.on('VAT Rate Changed', () => {
eq(++attempts, 1);
setTimeout(done, 1000);
});

Expand Down Expand Up @@ -147,4 +144,31 @@ describe('Notifications', () => {

filby.startNotifications();
});

it('should emit an event when the last notification attempt fails', async (t, done) => {
await filby.withTransaction(async (tx) => {
await tx.query(`INSERT INTO fby_projection (id, name, version) VALUES
(1, 'VAT Rates', 1),
(2, 'CGT Rates', 1)`);
await tx.query(`INSERT INTO fby_hook (id, projection_id, event) VALUES
(1, 1, 'VAT Rate Changed'),
(2, 2, 'CGT Rate Changed')`);
await tx.query(`INSERT INTO fby_notification (hook_id, projection_id, scheduled_for) VALUES
(1, 1, now())`);
});

filby.on('VAT Rate Changed', () => {
throw new Error('Oh Noes!');
});

filby.on(TestFilby.HOOK_MAX_ATTEMPTS_EXHAUSTED, (err, context) => {
eq(err.message, 'Oh Noes!');
eq(context.event, 'VAT Rate Changed');
deq(context.projection, { name: 'VAT Rates', version: 1 });
eq(context.attempts, 3);
setTimeout(done, 1000);
});

filby.startNotifications();
});
});

0 comments on commit ea4f591

Please sign in to comment.