Skip to content

Commit

Permalink
Demonstrate ADD_PROJECTION hook
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Jan 23, 2024
1 parent f4d4558 commit 4b43ac5
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 37 deletions.
1 change: 1 addition & 0 deletions examples/javascript/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
}
},
"webhooks": {
"httpbin/add-projection": "https://httpbin.org/status/200",
"httpbin/add-change-set/park-v1": "https://httpbin.org/status/200",
"httpbin/add-change-set/*": "https://httpbin.org/status/500"
}
Expand Down
24 changes: 16 additions & 8 deletions examples/javascript/lib/Application.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module.exports = class Application {
#logger;
#fastify;
#filby;
#routes = {};

constructor({ config }) {
this.#config = config;
Expand Down Expand Up @@ -44,9 +45,11 @@ module.exports = class Application {
}

async #handleHookFailures() {
this.#filby.subscribe(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (notification) => {
this.#logger.error('Hook failed', notification);
this.#logger.error(notification.err.stack);
this.#filby.subscribe(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (errNotification) => {
const { err: { message: errMessage, stack, config: { method, url } }, ...notification } = errNotification;
const message = `Hook '${notification.name}' failed after ${notification.attempts} attempts and will no longer be retried`;
this.#logger.error({ notification }, message);
this.#logger.error({ message: errMessage, stack, method, url });
});
}

Expand All @@ -61,20 +64,24 @@ module.exports = class Application {

async #registerWebhook(event, url) {
this.#filby.subscribe(event, async (notification) => {
await axios.post(url, notification);
const routes = this.#routes[notification.projection.id];
await axios.post(url, { ...notification, routes });
});
}

async #initFastify() {
await this.#fastify.register(cors, {
origin: '*',
methods: ['GET'],
});
this.#fastify.addHook('onRoute', (routeOptions) => this.captureProjectionPath(routeOptions));
await this.#fastify.register(cors, { origin: '*', methods: ['GET'] });
await this.#registerSwagger();
await this.#registerChangelog();
await this.#registerProjections();
}

captureProjectionPath(routeOptions) {
if (routeOptions.method !== 'GET' || !routeOptions.projection) return;
this.#routes[routeOptions.projection.id].push(routeOptions.path);
}

async #registerSwagger() {
await this.#fastify.register(swagger, {
swagger: {
Expand Down Expand Up @@ -113,6 +120,7 @@ module.exports = class Application {
async #registerProjections() {
const projections = await this.#filby.getProjections();
for (let i = 0; i < projections.length; i++) {
this.#routes[projections[i].id] = [];
await this.#registerProjection(projections[i]);
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/javascript/lib/routes/park-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const getParkSchema = require('../../../schemas/get-park-schema.json');

module.exports = (fastify, { projection, filby }, done) => {

fastify.get('/', { schema: getParksSchema }, async (request, reply) => {
fastify.get('/', { schema: getParksSchema, projection }, async (request, reply) => {
if (request.query.changeSetId === undefined) return redirectToCurrentChangeSet(request, reply);
const changeSetId = Number(request.query.changeSetId);
const changeSet = await getChangeSet(changeSetId);
Expand All @@ -20,7 +20,7 @@ module.exports = (fastify, { projection, filby }, done) => {
return parks;
});

fastify.get('/code/:code', { schema: getParkSchema }, async (request, reply) => {
fastify.get('/code/:code', { schema: getParkSchema, projection }, async (request, reply) => {
if (request.query.changeSetId === undefined) return redirectToCurrentChangeSet(request, reply);
const code = String(request.params.code).toUpperCase();
const changeSetId = Number(request.query.changeSetId);
Expand Down
4 changes: 4 additions & 0 deletions examples/migrations/0001.define-park-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
identified_by:
- id

- operation: ADD_HOOK
name: httpbin/add-projection
event: ADD_PROJECTION

- operation: ADD_PROJECTION
name: park
version: 1
Expand Down
1 change: 1 addition & 0 deletions examples/typescript/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
}
},
"webhooks": {
"httpbin/add-projection": "https://httpbin.org/status/200",
"httpbin/add-change-set/park-v1": "https://httpbin.org/status/200",
"httpbin/add-change-set/*": "https://httpbin.org/status/500"
}
Expand Down
38 changes: 26 additions & 12 deletions examples/typescript/lib/Application.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import path from 'node:path';

import Fastify, { FastifyInstance } from 'fastify';
import Fastify, { FastifyInstance, RouteOptions } from 'fastify';
import swagger from '@fastify/swagger';
import swaggerUI from '@fastify/swagger-ui';
import cors from '@fastify/cors';
import axios from 'axios';
import axios, { AxiosError } from 'axios';

import pkg from '../package.json';
import Filby, { Config as FilbyConfig, Projection, PoolConfig, ErrorNotification } from '../../..';
import Filby, { Config as FilbyConfig, Projection, PoolConfig, Notification, ErrorNotification } from '../../..';
import changeLogRoute from './routes/changelog-v1';

export type ApplicationConfig = {
Expand All @@ -27,12 +27,19 @@ export type ApplicationConfig = {
};
}

type ProjectionRouteOptions = {
method: string;
path: string;
projection: Projection;
}

export default class Application {

#config;
#logger;
#filby: Filby;
#fastify: FastifyInstance;
#routes = new Map<number, string[]>();

constructor({ config }: { config: ApplicationConfig }) {
this.#config = config;
Expand Down Expand Up @@ -61,9 +68,11 @@ export default class Application {
}

async #handleHookFailures() {
this.#filby.subscribe<ErrorNotification>(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (notification: ErrorNotification) => {
this.#logger.error('Hook failed', notification);
this.#logger.error(notification.err.stack);
this.#filby.subscribe<ErrorNotification<AxiosError>>(Filby.HOOK_MAX_ATTEMPTS_EXHAUSTED, async (errNotification: ErrorNotification<AxiosError>) => {
const { err: { message: errMessage, stack, config: { method, url } }, ...notification } = errNotification;
const message = `Hook '${notification.name}' failed after ${notification.attempts} attempts and will no longer be retried`;
this.#logger.error({ notification }, message);
this.#logger.error({ message: errMessage, stack, method, url });
});
}

Expand All @@ -77,21 +86,25 @@ export default class Application {
}

async #registerWebhook(event: string, url: string) {
this.#filby.subscribe(event, async (notification) => {
await axios.post(url, notification);
this.#filby.subscribe(event, async (notification: Notification) => {
const routes = this.#routes.get(notification.projection.id);
await axios.post(url, { ...notification, routes });
});
}

async #initFastify() {
await this.#fastify.register(cors, {
origin: '*',
methods: ['GET'],
});
this.#fastify.addHook('onRoute', (routeOptions: RouteOptions) => this.captureProjectionPath(routeOptions as unknown as ProjectionRouteOptions));
await this.#fastify.register(cors, { origin: '*', methods: ['GET'] });
await this.#registerSwagger();
await this.#registerChangelog();
await this.#registerProjections();
}

captureProjectionPath(routeOptions: ProjectionRouteOptions) {
if (routeOptions.method !== 'GET' || !routeOptions.projection) return;
this.#routes.get(routeOptions.projection.id)?.push(routeOptions.path);
}

async #registerSwagger() {
await this.#fastify.register(swagger, {
swagger: {
Expand Down Expand Up @@ -130,6 +143,7 @@ export default class Application {
async #registerProjections() {
const projections = await this.#filby.getProjections();
for (let i = 0; i < projections.length; i++) {
this.#routes.set(projections[i].id, []);
await this.#registerProjection(projections[i]);
}
}
Expand Down
16 changes: 8 additions & 8 deletions examples/typescript/lib/routes/park-v1.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify';
import { FastifyInstance, FastifyReply, FastifyRequest, RouteShorthandOptions } from 'fastify';
import createError from 'http-errors';
import uri from 'fast-uri';

Expand All @@ -7,12 +7,13 @@ import getParksSchema from '../../../schemas/get-parks-schema.json';
import getParkSchema from '../../../schemas/get-park-schema.json';

type ChangeSetId = number;
type FilbyQueryString = { changeSetId?: ChangeSetId };

export default (fastify: FastifyInstance, { projection, filby }: { projection: Projection, filby: Filby }, done: (err?: Error) => void) => {

fastify.get<{
Querystring: { changeSetId?: ChangeSetId }
}>('/', { schema: getParksSchema }, async (request, reply) => {
const getParksOptions = { schema: getParksSchema, projection };

fastify.get<{ Querystring: FilbyQueryString }>('/', getParksOptions, async (request, reply) => {
if (request.query.changeSetId === undefined) return redirectToCurrentChangeSet(request, reply);
const changeSetId = Number(request.query.changeSetId)
const changeSet = await getChangeSet(changeSetId);
Expand All @@ -27,10 +28,9 @@ export default (fastify: FastifyInstance, { projection, filby }: { projection: P
return parks;
});

fastify.get<{
Querystring: { changeSetId?: ChangeSetId },
Params: { code: string }
}>('/code/:code', { schema: getParkSchema }, async (request, reply) => {
const getParkOptions = { schema: getParkSchema, projection };

fastify.get<{ Querystring: FilbyQueryString, Params: { code: string } }>('/code/:code', getParkOptions, async (request, reply) => {
if (request.query.changeSetId === undefined) return redirectToCurrentChangeSet(request, reply);
const code = request.params.code.toUpperCase();
const changeSetId = Number(request.query.changeSetId)
Expand Down
8 changes: 5 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ export type ChangeSet = {
};

export type Notification = {
name: string;
event: string;
projection: Projection;
attempts: number;
} & Projection;
};

export type ErrorNotification = {
err: Error;
export type ErrorNotification<E> = {
err: Error<E>;
} & Notification

export type Entity = {
Expand Down
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ LIMIT 1`, [projection.id]);

async #getNotificationContext(tx, notification) {
const { rows } = await tx.query(
`SELECT h.name, h.event, p.id, p.name AS projection, p.version FROM fby_hook h
`SELECT h.name, h.event, p.id AS projection_id, p.name AS projection_name, p.version AS projection_version FROM fby_hook h
INNER JOIN fby_notification n ON n.hook_id = h.id
INNER JOIN fby_projection p ON p.id = n.projection_id
WHERE h.id = $1`,
Expand All @@ -174,7 +174,7 @@ WHERE h.id = $1`,
return rows.map((row) => ({
name: row.name,
event: row.event,
projection: { name: row.projection, version: row.version },
projection: { id: row.projection_id, name: row.projection_name, version: row.projection_version },
attempts: notification.attempts,
}))[0];
}
Expand Down
1 change: 1 addition & 0 deletions test/dsl/001.should-report-unsupported-file-types.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UNSUPPORTED
4 changes: 2 additions & 2 deletions test/notifications.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('Notifications', () => {
filby.subscribe('VAT Rate Changed', (notification) => {
eq(notification.name, 'VAT Rate Changed');
eq(notification.event, 'ADD_CHANGE_SET');
deq(notification.projection, { name: 'VAT Rates', version: 1 });
deq(notification.projection, { id: 1, name: 'VAT Rates', version: 1 });
eq(notification.attempts, 1);
done();
});
Expand Down Expand Up @@ -165,7 +165,7 @@ describe('Notifications', () => {
eq(notification.err.message, 'Oh Noes!');
eq(notification.name, 'VAT Rate Changed');
eq(notification.event, 'ADD_CHANGE_SET');
deq(notification.projection, { name: 'VAT Rates', version: 1 });
deq(notification.projection, { id: 1, name: 'VAT Rates', version: 1 });
eq(notification.attempts, 3);
setTimeout(done, 1000);
});
Expand Down

0 comments on commit 4b43ac5

Please sign in to comment.