Skip to content

Commit

Permalink
Redis Bus (#550)
Browse files Browse the repository at this point in the history
* first pass

* updating docs and eslint error

* deprecation warning
  • Loading branch information
jonwinton authored Jun 18, 2018
1 parent a6f98a1 commit 0b31d42
Show file tree
Hide file tree
Showing 17 changed files with 226 additions and 23 deletions.
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [Renderers](docs/topics/renderers.md)
* [Building Custom Renderers](docs/topics/custom-renderers.md)
* [Renderer Models](docs/topics/renderer-models.md)
* [Event Bus](docs/topics/event-bus.md)
* Advanced
* [Data Versioning (Upgrades)](docs/upgrade.md)
* [Plugins](docs/plugins/README.md)
Expand Down
3 changes: 3 additions & 0 deletions docs/plugins/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Plugins

> #### warning::API Notice
> Plugin functionality will change in Amphora v7.0.0, with many of the methods being deprecated in favor of using the [Event Bus](../topics/event-bus.md). Please upgrade to Amphora v6.6.0 as soon as possible and transition to using Event Bus topics.
Plugins allow for you to extend functionality in Amphora by tapping into lifecycle hooks in Amphora to perform secondary actions on the server.

**Important:** none of the plugin hooks allow you to manipulate the data that Amphora is processing, they only provide awareness of what has already been processed.
Expand Down
3 changes: 3 additions & 0 deletions docs/plugins/hooks.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Hooks

> #### warning::API Notice
> Plugin functionality will change in Amphora v7.0.0, with many of the methods being deprecated in favor of using the [Event Bus](../topics/event-bus.md). Please upgrade to Amphora v6.6.0 as soon as possible and transition to using Event Bus topics.
Below are details about the plugin hooks in Amphora. Plugin hooks are fired on each plugin supplied to Amphora at instantiation time. A plugin should expose a property whose name corresponds to one of the hooks below and is a function that expects the arguments detailed below.

---
Expand Down
3 changes: 3 additions & 0 deletions docs/plugins/plugin-vs-renderer.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Plugin vs. Renderer

> #### warning::API Notice
> Plugin functionality will change in Amphora v7.0.0, with many of the methods being deprecated in favor of using the [Event Bus](../topics/event-bus.md). Please upgrade to Amphora v6.6.0 as soon as possible and transition to using Event Bus topics.
While both plugins and renderers can be passed to Amphora, one key difference:

_Plugins are observers that are invoked based on what has already been processed by Amphora, whereas renderers are part of the request/response lifecycle for displaying data._
Expand Down
3 changes: 3 additions & 0 deletions docs/plugins/writing-a-plugin.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Writing A Plugin

> #### warning::API Notice
> Plugin functionality will change in Amphora v7.0.0, with many of the methods being deprecated in favor of using the [Event Bus](../topics/event-bus.md). Please upgrade to Amphora v6.6.0 as soon as possible and transition to using Event Bus topics.
Writing a plugin is simple, all you need is to pass in an object to the [`plugins` instantiation argument](/docs/lifecycle/startup/instantiation.html#instantiation-arguments) who has properties which correspond to the hooks [listed on the hooks page](hooks.md).

An example plugin is below.
Expand Down
59 changes: 59 additions & 0 deletions docs/topics/event-bus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Event Bus

As of Amphora version `6.6.0` the option of using [Redis as an event bus](https://redis.io/topics/pubsub) has been introduced. This event bus is intended to make it easier to a destructure a Clay instance and supporting platform packages (i.e. Amphora Search). By default the Bus module is not instantiated. Only by setting the [Redis Bus Host env var](#redis-bus-host) will the Bus module be active.

The Event Bus is also intended to replace the plugin system in the next major version of Amphora (v7). On top of replacing the plugin system, the event bus will see some small changes to the payload of certain events as the plugin system is rearchitected. The end goal is to expose specific hooks in the Amphora lifecycle to the Bus as quickly as possible.

## Bus Topics

The following topics are published to the bus by Amphora:

- `clay:publishLayout`
- `clay:publishPage`
- `clay:createPage`
- `clay:unschedulePage`
- `clay:schedulePage`
- `clay:unpublishPage`
- `clay:save`
- `clay:delete`

## Configuring the Bus

The Bus module has two configurations options which are both controlled by environment variables.

### Redis Bus Host

As mentioned, the Bus module is turned off by default. Only by setting the `REDIS_BUS_HOST` env var to a valid Redis url (`redis://<HOST>:<PORT>`) will the module be instantiated and events will be published to the instance.

### Namespace

By default, all topics published to the Bus are namespaced using `clay`, i.e. `clay:<ACTION>`. This namespace can be configured by the env var `CLAY_BUS_NAMESPACE`. For example, setting `CLAY_BUS_NAMESPACE` to a value of `mysite` will publish all events as `mysite:<ACTION>`.

## Subscribing To The Bus

Provided you have setup Amphora to pub to a Redis instance, the following code will subscribe to all events from Clay using the [`redis`](https://www.npmjs.com/package/redis) NPM module.

```javascript
'use strict';

var redis = require('redis'),
SUBSCRIBER = redis.createClient(process.env.REDIS_BUS_HOST),
CLAY_TOPICS = [
'publishLayout',
'publishPage',
'unpublishPage',
'createPage',
'schedulePage',
'unschedulePage',
'save',
'delete'
];

for (let i = 0; i < CLAY_TOPICS.length; i++) {
SUBSCRIBER.subscribe(`clay:${CLAY_TOPICS[i]}`);
}

SUBSCRIBER.on('message', (channel, payload) => {
console.log(`Channel: ${channel}\n\n\n${payload}`);
});
```
33 changes: 33 additions & 0 deletions lib/services/bus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';

const redis = require('redis'),
NAMESPACE = process.env.CLAY_BUS_NAMESPACE || 'clay';

/**
* Connect to the bus Redis instance
*/
function connect() {
module.exports.client = redis.createClient(process.env.REDIS_BUS_HOST);
}

/**
* Publish a message to the bus.
*
* @param {String} topic
* @param {String} msg
*/
function publish(topic, msg) {
if (!topic || !msg || typeof topic !== 'string' || typeof msg !== 'string') {
throw new Error('A `topic` and `msg` property must be defined and both must be strings');
}

if (module.exports.client) {
module.exports.client.publish(`${NAMESPACE}:${topic}`, msg);
}
}

module.exports.connect = connect;
module.exports.publish = publish;

// For testing
module.exports.client = undefined;
59 changes: 59 additions & 0 deletions lib/services/bus.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict';

const _ = require('lodash'),
filename = __filename.split('/').pop().split('.').shift(),
lib = require('./' + filename),
sinon = require('sinon'),
expect = require('chai').expect,
redis = require('redis');

describe(_.startCase(filename), function () {
var sandbox, publish;

beforeEach(function () {
sandbox = sinon.sandbox.create();
sandbox.stub(redis);
publish = sandbox.stub();
lib.client = { publish };
});

afterEach(function () {
sandbox.restore();
});

describe('connect', function () {
const fn = lib[this.title];

it('calls the `createClient` method for Redis', function () {
fn();
sinon.assert.calledOnce(redis.createClient);
});
});

describe('publish', function () {
const fn = lib[this.title];

it('throws an error if topic and message are not defined', function () {
expect(fn).to.throw();
});

it('throws an error if topic is not a string', function () {
expect(() => fn(1, 'foo')).to.throw();
});

it('throws an error if message is not defined', function () {
expect(() => fn('foo', 1)).to.throw();
});

it('calls the publish function if topic and string pass validation', function () {
fn('foo', 'bar');
sinon.assert.calledOnce(publish);
});

it('does not call publish if no client is defined', function () {
lib.client = false;
fn('foo', 'bar');
sinon.assert.notCalled(publish);
});
});
});
11 changes: 9 additions & 2 deletions lib/services/components.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const _ = require('lodash'),
upgrade = require('./upgrade'),
{ getComponentName, replaceVersion } = require('clayutils'),
plugins = require('../plugins'),
bus = require('./bus'),
referenceProperty = '_ref',
timeoutGetCoefficient = 2,
timeoutPutCoefficient = 5;
Expand Down Expand Up @@ -366,7 +367,10 @@ function publish(uri, data, locals) {
return cascadingPut(uri, data, locals)
.then(publishedData => isLayout(uri).then((definitelyALayout) => {
if (definitelyALayout) {
plugins.executeHook('publishLayout', { uri: uri, data: publishedData, user: locals && locals.user });
let obj = { uri: uri, data: publishedData, user: locals && locals.user };

plugins.executeHook('publishLayout', obj);
bus.publish('publishLayout', JSON.stringify(obj));
}
return publishedData;
}));
Expand All @@ -377,7 +381,10 @@ function publish(uri, data, locals) {
.then(versionedData => cascadingPut(uri, versionedData, locals))
.then(publishedData => isLayout(uri).then((definitelyALayout) => {
if (definitelyALayout) {
plugins.executeHook('publishLayout', { uri: uri, data: publishedData, user: locals && locals.user });
let obj = { uri: uri, data: publishedData, user: locals && locals.user };

plugins.executeHook('publishLayout', obj);
bus.publish('publishLayout', JSON.stringify(obj));
}
return publishedData;
}));
Expand Down
3 changes: 3 additions & 0 deletions lib/services/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const _ = require('lodash'),
validation = require('../validation'),
promiseDefer = require('../utils/defer'),
plugins = require('../plugins'),
bus = require('./bus'),
publishedVersionSuffix = '@published';
let db = require('levelup')('whatever', { db: require('memdown') });

Expand Down Expand Up @@ -252,10 +253,12 @@ function triggerPlugins(method, args) {
});
} else {
plugins.executeHook('save', hookOps.put);
bus.publish('save', JSON.stringify(hookOps.put));
}
}
if (hookOps.del.length) {
plugins.executeHook('delete', hookOps.del);
bus.publish('delete', JSON.stringify(hookOps.del));
}
}

Expand Down
18 changes: 15 additions & 3 deletions lib/services/pages.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const _ = require('lodash'),
{ getComponentName, replaceVersion, getPrefix } = require('clayutils'),
publishService = require('./publish'),
plugins = require('../plugins'),
bus = require('./bus'),
timeoutPublishCoefficient = 5;

/**
Expand Down Expand Up @@ -288,7 +289,10 @@ function publish(uri, data, locals) {
}
}).timeout(timeoutLimit, `Page publish exceeded ${timeoutLimit}ms: ${uri}`)
.then(function (publishedData) {
plugins.executeHook('publishPage', { uri: uri, data: publishedData, user: locals && locals.user });
let obj = { uri: uri, data: publishedData, user: locals && locals.user };

plugins.executeHook('publishPage', obj);
bus.publish('publishPage', JSON.stringify(obj));
return publishedData;
});
}
Expand Down Expand Up @@ -319,7 +323,10 @@ function create(uri, data, locals) {
return addOp(pageReference, pageData, ops);
});
}).then(applyBatch(site)).then(function (newPage) {
plugins.executeHook('createPage', { uri: pageReference, data: newPage, user: locals && locals.user });
let obj = { uri: pageReference, data: newPage, user: locals && locals.user };

plugins.executeHook('createPage', obj);
bus.publish('createPage', JSON.stringify(obj));
// if successful, return new page object, but include the (optional) self reference to the new page.
newPage._ref = pageReference;

Expand Down Expand Up @@ -359,7 +366,12 @@ function putLatest(uri, data, locals) {
// continue saving the page normally
return getLatestData(uri)
.then(() => db.put(uri, JSON.stringify(data)).return(data)) // data already exists
.catch(() => db.put(uri, JSON.stringify(data)).then(() => plugins.executeHook('createPage', { uri, data, user: locals && locals.user })).return(data));
.catch(() => db.put(uri, JSON.stringify(data)).then(() => {
let obj = { uri, data, user: locals && locals.user };

plugins.executeHook('createPage', obj);
bus.publish('createPage', JSON.stringify(obj));
})).return(data);
});
}

Expand Down
11 changes: 9 additions & 2 deletions lib/services/schedule.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const _ = require('lodash'),
rest = require('../rest'),
siteService = require('./sites'),
plugins = require('../plugins'),
bus = require('./bus'),
publishProperty = 'publish',
scheduledAtProperty = 'at',
scheduledVersion = 'scheduled';
Expand Down Expand Up @@ -104,7 +105,10 @@ function del(uri, user) {

return db.batch(ops).then(function () {
if (isPage(targetUri)) {
plugins.executeHook('unschedulePage', { uri: targetUri, data: data, user: user });
let obj = { uri: targetUri, data: data, user: user };

plugins.executeHook('unschedulePage', obj);
bus.publish('unschedulePage', JSON.stringify(obj));
}
}).return(data);
});
Expand All @@ -130,7 +134,10 @@ function post(uri, data, user) {
return db.batch(ops).then(function () {
log('info', `scheduled ${targetUri} (${data.at})`);
if (isPage(targetUri)) {
let obj = { uri: targetUri, data: data, user: user };

plugins.executeHook('schedulePage', { uri: targetUri, data: data, user: user });
bus.publish('schedulePage', JSON.stringify(obj));
}
}).return(referencedData);
}
Expand All @@ -142,7 +149,7 @@ function post(uri, data, user) {
*/
function publishByTime(promises, item) {
promises.push(publishExternally(item.value[publishProperty])
.then(function () { return del(item.key); }));
.then(() => del(item.key)));

return promises;
}
Expand Down
16 changes: 8 additions & 8 deletions lib/services/uris.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const _ = require('lodash'),
{ getPrefix } = require('clayutils'),
notifications = require('./notifications'),
siteService = require('./sites'),
plugins = require('../plugins');
plugins = require('../plugins'),
bus = require('./bus');

/**
* @param {string} uri
Expand Down Expand Up @@ -53,15 +54,14 @@ function del(uri, user) {
return db.del(uri).then(function () {
const prefix = getPrefix(uri),
site = siteService.getSiteFromPrefix(prefix),
pageUrl = buf.decode(uri.split('/').pop());
pageUrl = buf.decode(uri.split('/').pop()),
obj = { uri: oldData, url: pageUrl, user: user };

// TODO: Clean this up in v7
// Call the unpublish hook for plugins
plugins.executeHook('unpublish', {
url: pageUrl,
uri: oldData
});

plugins.executeHook('unpublishPage', { uri: oldData, url: pageUrl, user: user });
plugins.executeHook('unpublish', { url: pageUrl, uri: oldData });
plugins.executeHook('unpublishPage', obj);
bus.publish('unpublishPage', JSON.stringify(obj));

notifications.notify(site, 'unpublished', { url: pageUrl, uri: oldData });
}).return(oldData);
Expand Down
7 changes: 6 additions & 1 deletion lib/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const bluebird = require('bluebird'),
routes = require('./routes'),
internalBootstrap = require('./bootstrap'),
render = require('./render'),
amphoraPlugins = require('./plugins');
amphoraPlugins = require('./plugins'),
bus = require('./services/bus');

bluebird.config({
longStackTraces: true
Expand Down Expand Up @@ -41,6 +42,10 @@ module.exports = function (options = {}) {
amphoraPlugins.registerPlugins(plugins);
}

if (process.env.REDIS_BUS_HOST) {
bus.connect();
}

// init the router
router = routes(app, providers, sessionStore, cacheControl);

Expand Down
Loading

0 comments on commit 0b31d42

Please sign in to comment.