diff --git a/src/server/error/NotImplementedError.js b/src/server/error/NotImplementedError.js new file mode 100644 index 00000000..6c57583d --- /dev/null +++ b/src/server/error/NotImplementedError.js @@ -0,0 +1,7 @@ +module.exports = class NotImplementedError extends Error { + constructor(message = "") { + super(); + this.message = message; + this.name = this.constructor.name; + } +}; diff --git a/src/server/queue/bee.js b/src/server/queue/bee.js new file mode 100644 index 00000000..df14c40b --- /dev/null +++ b/src/server/queue/bee.js @@ -0,0 +1,87 @@ +const Queue = require('./queue'); +const Job = require('./job'); +const JobData = require('./jobData'); + +class BeeJob extends Job { + async remove() { + await this._job.remove(); + } + + async getStatus() { + return this._job.status; + } + + async toJSON() { + const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job; + return new JobData({id, progress, data, timestamp, stacktrace, delay}); + } +} + +const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed']; +const SUPPORTED_ACTIONS = ['remove']; + +module.exports = class BeeQueue extends Queue { + constructor(queueConfig) { + const {name} = queueConfig; + const options = BeeQueue.parseConfig(queueConfig); + const queue = new BeeQueue(name, options); + super(queue); + } + + static parseConfig(queueConfig) { + const options = { + redis: this.parseRedisConfig(queueConfig), + isWorker: false, + getEvents: false, + sendEvents: false, + storeJobs: false, + }; + const {prefix} = queueConfig; + if (prefix) options.prefix = prefix; + return options; + } + + async getJob(id) { + const job = this._queue.getJob(id); + return new BeeJob(job); + } + + async getJobCounts() { + const jobCounts = this._queue.checkHealth(); + delete jobCounts.newestJob; + return jobCounts; + } + + async getJobs(state, start, size) { + const page = {}; + + if (['failed', 'succeeded'].includes(state)) { + page.size = size; + } else { + page.start = start; + page.end = start + size - 1; + } + + let jobs = await this._queue.getJobs(state, page); + // Filter out Bee jobs that have already been removed by the time the promise resolves + jobs = jobs.filter((job) => job); + return jobs.map((j) => new BeeJob(j)); + } + + async addJob(data) { + const job = await this._queue.createJob(data).save(); + return new BeeJob(job); + } + + isValidState(state) { + return VALID_STATES.includes(state); + } + + isActionSupported(action) { + return SUPPORTED_ACTIONS.includes(action); + } + + isPaginationSupported(state) { + return state !== 'succeeded' && state !== 'failed'; + } +}; diff --git a/src/server/queue/bull.js b/src/server/queue/bull.js new file mode 100644 index 00000000..9ec02d87 --- /dev/null +++ b/src/server/queue/bull.js @@ -0,0 +1,97 @@ +const {capitalize} = require('lodash'); +const Bull = require('bull'); +const Queue = require('./queue'); +const Job = require('./job'); +const JobData = require('./jobData'); + +const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed']; +const SUPPORTED_ACTIONS = ['remove', 'retry']; + +class BullJob extends Job { + async remove() { + await this._job.remove(); + } + + async retry() { + await this._job.retry(); + } + + async getStatus() { + return this._job.getState(); + } + + + async toJSON() { + const { + id, + name, + data, + attemptsMade, + failedReason, + stacktrace, + returnvalue: returnValue, + timestamp, + delay, + progress + } = this._job.toJSON(); + return new JobData({ + id, + name, + data, + attemptsMade, + failedReason, + stacktrace, + timestamp, + delay, + progress, + returnValue, + }); + } +} + +module.exports = class BullQueue extends Queue { + constructor(queueConfig) { + const {name} = queueConfig; + const options = BullQueue.parseConfig(queueConfig); + const queue = Bull(name, options); + super(queue); + } + + static parseConfig(queueConfig) { + const options = {redis: this.parseRedisConfig(queueConfig)}; + const {createClient, prefix} = queueConfig; + if (createClient) options.createClient = createClient; + if (prefix) options.prefix = prefix; + return options; + } + + async getJob(id) { + const job = await this._queue.getJob(id); + return new BullJob(job); + } + + async getJobCounts() { + return this._queue.getJobCounts(); + } + + async getJobs(state, start, size) { + const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1); + return jobs.map((j) => new BullJob(j)); + } + + async addJob(data) { + const job = await this._queue.add(data, { + removeOnComplete: false, + removeOnFail: false + }); + return new BullJob(job); + } + + isValidState(state) { + return VALID_STATES.includes(state); + } + + isActionSupported(action) { + return SUPPORTED_ACTIONS.includes(action); + } +}; diff --git a/src/server/queue/index.js b/src/server/queue/index.js index e9bd20fe..7e3a2405 100644 --- a/src/server/queue/index.js +++ b/src/server/queue/index.js @@ -1,6 +1,6 @@ const _ = require('lodash'); -const Bull = require('bull'); -const Bee = require('bee-queue'); +const BullQueue = require('./bull'); +const BeeQueue = require('./bee'); class Queues { constructor(config) { @@ -38,35 +38,12 @@ class Queues { return this._queues[queueHost][queueName]; } - const { type, name, port, host, db, password, prefix, url, redis, tls } = queueConfig; - - const redisHost = { host }; - if (password) redisHost.password = password; - if (port) redisHost.port = port; - if (db) redisHost.db = db; - if (tls) redisHost.tls = tls; - - const isBee = type === 'bee'; - - const options = { - redis: redis || url || redisHost - }; - if (prefix) options.prefix = prefix; - + const {type} = queueConfig; let queue; - if (isBee) { - _.extend(options, { - isWorker: false, - getEvents: false, - sendEvents: false, - storeJobs: false - }); - - queue = new Bee(name, options); - queue.IS_BEE = true; + if (type === 'bee') { + queue = new BeeQueue(queueConfig); } else { - if (queueConfig.createClient) options.createClient = queueConfig.createClient; - queue = new Bull(name, options); + queue = new BullQueue(queueConfig); } this._queues[queueHost] = this._queues[queueHost] || {}; @@ -74,23 +51,6 @@ class Queues { return queue; } - - /** - * Creates and adds a job with the given `data` to the given `queue`. - * - * @param {Object} queue A bee or bull queue class - * @param {Object} data The data to be used within the job - */ - async set(queue, data) { - if (queue.IS_BEE) { - return queue.createJob(data).save(); - } else { - return queue.add(data, { - removeOnComplete: false, - removeOnFail: false - }); - } - } } module.exports = Queues; diff --git a/src/server/queue/job.js b/src/server/queue/job.js new file mode 100644 index 00000000..030cd4c6 --- /dev/null +++ b/src/server/queue/job.js @@ -0,0 +1,22 @@ +const NotImplementedError = require('../error/NotImplementedError'); + +module.exports = class Job { + constructor(job) { + this._job = job; + if (new.target === Job) { + throw new TypeError("Cannot construct Job instances directly"); + } + } + + async remove() { + throw new NotImplementedError(); + } + + async getStatus() { + throw new NotImplementedError(); + } + + async toJSON() { + throw new NotImplementedError(); + } +}; diff --git a/src/server/queue/jobData.js b/src/server/queue/jobData.js new file mode 100644 index 00000000..7561819f --- /dev/null +++ b/src/server/queue/jobData.js @@ -0,0 +1,16 @@ +module.exports = class JobData { + constructor({id, name, data, stacktrace, timestamp, progress, delay, attemptsMade, returnValue, failedReason}) { + this.id = id; + this.name = name; + this.data = data; + this.progress = progress; + this.attemptsMade = attemptsMade; + this.returnValue = returnValue; + this.failedReason = failedReason; + this.options = { + stacktrace, + timestamp, + delay, + }; + } +}; diff --git a/src/server/queue/queue.js b/src/server/queue/queue.js new file mode 100644 index 00000000..8af9e643 --- /dev/null +++ b/src/server/queue/queue.js @@ -0,0 +1,51 @@ +const NotImplementedError = require('../error/NotImplementedError'); + +module.exports = class Queue { + constructor(queue) { + this._queue = queue; + if (new.target === Queue) { + throw new TypeError("Cannot construct Queue instances directly"); + } + } + + static parseRedisConfig({port, host, db, password, url, redis, tls}) { + const redisHost = {host}; + if (password) redisHost.password = password; + if (port) redisHost.port = port; + if (db) redisHost.db = db; + if (tls) redisHost.tls = tls; + return redis || url || redisHost; + } + + get redisClient() { + return this._queue.client; + } + + async getJob(_id) { + throw new NotImplementedError(); + } + + async getJobCounts() { + throw new NotImplementedError(); + } + + async getJobs(_state, _start, _size) { + throw new NotImplementedError(); + } + + async addJob(_data, _options) { + throw new NotImplementedError(); + } + + isValidState(_state) { + throw new NotImplementedError(); + } + + isActionSupported(_action) { + throw new NotImplementedError(); + } + + isPaginationSupported(_state) { + return true; + } +}; diff --git a/src/server/views/api/bulkAction.js b/src/server/views/api/bulkAction.js index 83f364e6..5bc6c504 100644 --- a/src/server/views/api/bulkAction.js +++ b/src/server/views/api/bulkAction.js @@ -1,20 +1,18 @@ const _ = require('lodash'); -const ACTIONS = ['remove', 'retry']; - function bulkAction(action) { return async function handler(req, res) { - if (!_.includes(ACTIONS, action)) { - res.status(401).send({ - error: 'unauthorized action', - details: `action ${action} not permitted` - }); - } - const { queueName, queueHost } = req.params; const {Queues} = req.app.locals; const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); + if (!queue) return void res.status(404).json({error: 'queue not found'}); + + if (!queue.isActionSupported(action)) { + return void res.status(401).json({ + error: 'unauthorized action', + details: `queue does not support action ${action}` + }); + } const {jobs} = req.body; diff --git a/src/server/views/api/jobAdd.js b/src/server/views/api/jobAdd.js index cbca9d38..7c16fe23 100644 --- a/src/server/views/api/jobAdd.js +++ b/src/server/views/api/jobAdd.js @@ -8,7 +8,7 @@ async function handler(req, res) { if (!queue) return res.status(404).json({ error: 'queue not found' }); try { - await Queues.set(queue, data); + await queue.addJob(data); } catch (err) { return res.status(500).json({ error: err.message }); } diff --git a/src/server/views/api/jobRemove.js b/src/server/views/api/jobRemove.js index 8a5abfdd..4ddca41d 100644 --- a/src/server/views/api/jobRemove.js +++ b/src/server/views/api/jobRemove.js @@ -1,23 +1 @@ -async function handler(req, res) { - const { queueName, queueHost, id } = req.params; - - const {Queues} = req.app.locals; - const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); - - const job = await queue.getJob(id); - if (!job) return res.status(404).send({error: 'job not found'}); - - try { - await job.remove(); - return res.sendStatus(200); - } catch (e) { - const body = { - error: 'queue error', - details: e.stack - }; - return res.status(500).send(body); - } -} - -module.exports = handler; +module.exports = require('./performAction')('remove'); diff --git a/src/server/views/api/jobRetry.js b/src/server/views/api/jobRetry.js index c8c10c02..8c8bcb58 100644 --- a/src/server/views/api/jobRetry.js +++ b/src/server/views/api/jobRetry.js @@ -1,24 +1 @@ -async function handler(req, res) { - const { queueName, queueHost, id } = req.params; - - const {Queues} = req.app.locals; - - const queue = await Queues.get(queueName, queueHost); - if (!queue) return res.status(404).send({error: 'queue not found'}); - - const job = await queue.getJob(id); - if (!job) return res.status(404).send({error: 'job not found'}); - - try { - await job.retry(); - return res.sendStatus(200); - } catch (e) { - const body = { - error: 'queue error', - details: e.stack - }; - return res.status(500).send(body); - } -} - -module.exports = handler; +module.exports = require('./performAction')('retry'); diff --git a/src/server/views/api/performAction.js b/src/server/views/api/performAction.js new file mode 100644 index 00000000..b81d24ef --- /dev/null +++ b/src/server/views/api/performAction.js @@ -0,0 +1,30 @@ +module.exports = function performAction(action) { + return async function handler(req, res) { + const {queueName, queueHost, id} = req.params; + + const {Queues} = req.app.locals; + const queue = await Queues.get(queueName, queueHost); + if (!queue) return void res.status(404).json({error: 'queue not found'}); + + if (!queue.isActionSupported(action)) { + return void res.status(401).json({ + error: 'unauthorized action', + details: `queue does not support action ${action}` + }); + } + + const job = await queue.getJob(id); + if (!job) return void res.status(404).json({error: 'job not found'}); + + try { + await job[action](); + return void res.sendStatus(204); + } catch (e) { + const body = { + error: 'queue error', + details: e.stack + }; + return void res.status(500).send(body); + } + }; +}; diff --git a/src/server/views/dashboard/jobDetails.js b/src/server/views/dashboard/jobDetails.js index 5fe9f0b5..a3be68d7 100644 --- a/src/server/views/dashboard/jobDetails.js +++ b/src/server/views/dashboard/jobDetails.js @@ -1,6 +1,3 @@ -const _ = require('lodash'); -const util = require('util'); - async function handler(req, res) { const { queueName, queueHost, id } = req.params; const { json } = req.query; @@ -12,26 +9,22 @@ async function handler(req, res) { const job = await queue.getJob(id); if (!job) return res.status(404).render('dashboard/templates/jobNotFound', {basePath, id, queueName, queueHost}); + const jobData = await job.toJSON(); if (json === 'true') { // Omit these private and non-stringifyable properties to avoid circular // references parsing errors. - return res.json(_.omit(job, 'domain', 'queue', '_events', '_eventsCount')); + return void res.json(jobData); } - let jobState; - if (queue.IS_BEE) { - jobState = job.status; - } else { - jobState = await job.getState(); - } + const jobState = await job.getStatus(); return res.render('dashboard/templates/jobDetails', { basePath, queueName, queueHost, jobState, - job + job: jobData }); } diff --git a/src/server/views/dashboard/queueDetails.js b/src/server/views/dashboard/queueDetails.js index 7ea101fe..f0bf1ef7 100644 --- a/src/server/views/dashboard/queueDetails.js +++ b/src/server/views/dashboard/queueDetails.js @@ -7,13 +7,7 @@ async function handler(req, res) { const basePath = req.baseUrl; if (!queue) return res.status(404).render('dashboard/templates/queueNotFound', {basePath, queueName, queueHost}); - let jobCounts; - if (queue.IS_BEE) { - jobCounts = await queue.checkHealth(); - delete jobCounts.newestJob; - } else { - jobCounts = await queue.getJobCounts(); - } + const jobCounts = await queue.getJobCounts(); const stats = await QueueHelpers.getStats(queue); return res.render('dashboard/templates/queueDetails', { diff --git a/src/server/views/dashboard/queueJobsByState.js b/src/server/views/dashboard/queueJobsByState.js index 8a76f186..b504db34 100644 --- a/src/server/views/dashboard/queueJobsByState.js +++ b/src/server/views/dashboard/queueJobsByState.js @@ -1,18 +1,4 @@ const _ = require('lodash'); -const { BEE_STATES, BULL_STATES } = require('../helpers/queueHelpers'); - -/** - * Determines if the requested job state lookup is valid. - * - * @param {String} state - * @param {Boolean} isBee States vary between bull and bee - * - * @return {Boolean} - */ -function isValidState(state, isBee) { - const validStates = isBee ? BEE_STATES : BULL_STATES; - return _.includes(validStates, state); -} async function handler(req, res) { if (req.params.ext === 'json') return _json(req, res); @@ -32,16 +18,10 @@ async function _json(req, res) { const queue = await Queues.get(queueName, queueHost); if (!queue) return res.status(404).json({ message: 'Queue not found' }); - if (!isValidState(state, queue.IS_BEE)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); + if (!queue.isValidState(state)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); - let jobs; - if (queue.IS_BEE) { - jobs = await queue.getJobs(state, { size: 1000 }); - jobs = jobs.map((j) => _.pick(j, 'id', 'progress', 'data', 'options', 'status')); - } else { - jobs = await queue[`get${_.capitalize(state)}`](0, 1000); - jobs = jobs.map((j) => j.toJSON()); - } + let jobs = await queue.getJobs(state, 0, 1000); + jobs = jobs.map((j) => j.toJSON()); const filename = `${queueName}-${state}-dump.json`; @@ -63,40 +43,16 @@ async function _html(req, res) { const basePath = req.baseUrl; if (!queue) return res.status(404).render('dashboard/templates/queueNotFound', {basePath, queueName, queueHost}); - if (!isValidState(state, queue.IS_BEE)) return res.status(400).json({ message: `Invalid state requested: ${state}` }); - - let jobCounts; - if (queue.IS_BEE) { - jobCounts = await queue.checkHealth(); - delete jobCounts.newestJob; - } else { - jobCounts = await queue.getJobCounts(); - } + if (!queue.isValidState(state)) return void res.status(400).json({ message: `Invalid state requested: ${state}` }); + const jobCounts = await queue.getJobCounts(); const page = parseInt(req.query.page, 10) || 1; const pageSize = parseInt(req.query.pageSize, 10) || 100; const startId = (page - 1) * pageSize; - const endId = startId + pageSize - 1; - - let jobs; - if (queue.IS_BEE) { - const page = {}; - - if (['failed', 'succeeded'].includes(state)) { - page.size = pageSize; - } else { - page.start = startId; - page.end = endId; - } - - jobs = await queue.getJobs(state, page); - - // Filter out Bee jobs that have already been removed by the time the promise resolves - jobs = jobs.filter((job) => job); - } else { - jobs = await queue[`get${_.capitalize(state)}`](startId, endId); - } + let jobs = await queue.getJobs(state, startId, pageSize); + const jobPromises = jobs.map((j) => j.toJSON()); + jobs = await Promise.all(jobPromises); let pages = _.range(page - 6, page + 7) .filter((page) => page >= 1); @@ -112,7 +68,7 @@ async function _html(req, res) { state, jobs, jobsInStateCount: jobCounts[state], - disablePagination: queue.IS_BEE && (state === 'succeeded' || state === 'failed'), + disablePagination: !queue.isPaginationSupported(state), currentPage: page, pages, pageSize, diff --git a/src/server/views/dashboard/templates/queueJobsByState.hbs b/src/server/views/dashboard/templates/queueJobsByState.hbs index d647abf9..6577d51e 100644 --- a/src/server/views/dashboard/templates/queueJobsByState.hbs +++ b/src/server/views/dashboard/templates/queueJobsByState.hbs @@ -39,7 +39,7 @@ {{else}} - Bee-queue does not support pagination for {{ state }} queues — currently displaying up to {{ pageSize }} jobs. To change count, use "Size" dropdown. + Queue implementation does not support pagination for {{ state }} queues — currently displaying up to {{ pageSize }} jobs. To change count, use "Size" dropdown. {{/unless}}
diff --git a/src/server/views/helpers/queueHelpers.js b/src/server/views/helpers/queueHelpers.js index 75a36a85..770edcf6 100644 --- a/src/server/views/helpers/queueHelpers.js +++ b/src/server/views/helpers/queueHelpers.js @@ -30,9 +30,9 @@ function formatBytes(num) { const Helpers = { getStats: async function(queue) { - await queue.client.info(); // update queue.client.serverInfo + await queue.redisClient.info(); // update queue.client.serverInfo - const stats = _.pickBy(queue.client.serverInfo, (value, key) => _.includes(this._usefulMetrics, key)); + const stats = _.pickBy(queue.redisClient.serverInfo, (value, key) => this._usefulMetrics.includes(key)); stats.used_memory = formatBytes(parseInt(stats.used_memory, 10)); stats.total_system_memory = formatBytes(parseInt(stats.total_system_memory, 10)); return stats; @@ -46,16 +46,6 @@ const Helpers = { 'connected_clients', 'blocked_clients' ], - - /** - * Valid states for a job in bee queue - */ - BEE_STATES: ['waiting', 'active', 'succeeded', 'failed', 'delayed'], - - /** - * Valid states for a job in bull queue - */ - BULL_STATES: ['waiting', 'active', 'completed', 'failed', 'delayed'] }; module.exports = Helpers; diff --git a/src/server/views/partials/dashboard/jobDetails.hbs b/src/server/views/partials/dashboard/jobDetails.hbs index c8db1457..53662149 100644 --- a/src/server/views/partials/dashboard/jobDetails.hbs +++ b/src/server/views/partials/dashboard/jobDetails.hbs @@ -24,17 +24,14 @@ {{#if this.options.timestamp}} {{moment this.options.timestamp "llll"}} {{/if}} - {{#if this.timestamp}} - {{moment this.timestamp "llll"}} - {{/if}}
Attempts Made
{{this.attemptsMade}} - {{#if this.options}} - {{length this.options.stacktraces}} + {{#if this.options.stacktrace}} + {{length this.options.stacktrace}} {{/if}}
@@ -46,7 +43,6 @@ -{{#unless this.queue.IS_BEE}}
Progress
- {{ this._progress }}% + style="width: {{ this.progress }}%; min-width: 2em;"> + {{ this.progress }}%
-{{/unless}} -{{#if this.returnvalue}} +{{#if this.returnValue}}
Return Value
-
{{json this.returnvalue true}}
+
{{json this.returnValue true}}
{{/if}} {{#if this.failedReason}} @@ -75,14 +70,8 @@ {{#eq jobState 'failed'}}
Stacktraces
- {{#if this.options.stacktraces}} - {{#each this.options.stacktraces}} -
{{ this }}
- {{/each}} - {{/if}} - - {{#if this.stacktrace}} - {{#each this.stacktrace}} + {{#if this.options.stacktrace}} + {{#each this.options.stacktrace}}
{{ this }}
{{/each}} {{/if}}