diff --git a/commands/make_job.ts b/commands/make_job.ts index df47380..1fcd107 100644 --- a/commands/make_job.ts +++ b/commands/make_job.ts @@ -1,3 +1,10 @@ +/** + * @acidiney/bull-queue + * + * @license MIT + * @copyright Romain Lanz + */ + import { CommandOptions } from '@adonisjs/core/types/ace' import { stubsRoot } from '../stubs/main.js' import { BaseCommand, args } from '@adonisjs/core/ace' diff --git a/commands/queue_clear.ts b/commands/queue_clear.ts index 40c40bf..5767418 100644 --- a/commands/queue_clear.ts +++ b/commands/queue_clear.ts @@ -1,4 +1,10 @@ -import { configProvider } from '@adonisjs/core' +/** + * @acidiney/bull-queue + * + * @license MIT + * @copyright Romain Lanz + */ + import { BaseCommand, flags } from '@adonisjs/core/ace' import { CommandOptions } from '@adonisjs/core/types/ace' @@ -16,17 +22,7 @@ export default class QueueListener extends BaseCommand { async run() { const Queue = await this.app.container.make('bull_queue') - const queueConfigProvider = await this.app.config.get('queue') - const config = await configProvider.resolve(this.app, queueConfigProvider) - - if (!this.queue || this.queue.length === 0) this.queue = config.queueNames - - await Promise.all( - this.queue.map(async (queue) => { - await Queue.clear(queue) - }) - ) - return + await Queue.clearBulk(this.queue) } } diff --git a/commands/queue_listener.ts b/commands/queue_listener.ts index 6c4b7ab..e4902c1 100644 --- a/commands/queue_listener.ts +++ b/commands/queue_listener.ts @@ -1,11 +1,10 @@ /** - * @rlanz/bull-queue + * @acidiney/bull-queue * * @license MIT - * @copyright Romain Lanz + * @copyright Romain Lanz */ -import { configProvider } from '@adonisjs/core' import { BaseCommand, flags } from '@adonisjs/core/ace' import { CommandOptions } from '@adonisjs/core/types/ace' @@ -23,18 +22,10 @@ export default class QueueListener extends BaseCommand { async run() { const Queue = await this.app.container.make('bull_queue') - const queueConfigProvider = await this.app.config.get('queue') - const config = await configProvider.resolve(this.app, queueConfigProvider) const router = await this.app.container.make('router') router.commit() - if (!this.queue || this.queue.length === 0) this.queue = config.queueNames - - this.queue.map((q) => - Queue.process({ - queueName: q, - }) - ) + Queue.processBulk(this.queue) } } diff --git a/commands/queue_listener_ui.ts b/commands/queue_listener_ui.ts index e526383..cf32eaa 100644 --- a/commands/queue_listener_ui.ts +++ b/commands/queue_listener_ui.ts @@ -1,11 +1,10 @@ /** - * @rlanz/bull-queue + * @acidiney/bull-queue * * @license MIT - * @copyright Romain Lanz + * @copyright Romain Lanz */ -import { configProvider } from '@adonisjs/core' import { BaseCommand, flags } from '@adonisjs/core/ace' import { CommandOptions } from '@adonisjs/core/types/ace' @@ -26,14 +25,10 @@ export default class QueueListener extends BaseCommand { async run() { const Queue = await this.app.container.make('bull_queue') - const queueConfigProvider = await this.app.config.get('queue') - const config = await configProvider.resolve(this.app, queueConfigProvider) const router = await this.app.container.make('router') router.commit() - if (!this.queue || this.queue.length === 0) this.queue = config.queueNames - await Queue.ui(this.port || 9999, this.queue) } } diff --git a/src/bull_manager.ts b/src/bull_manager.ts index 98b3c44..fdbe275 100644 --- a/src/bull_manager.ts +++ b/src/bull_manager.ts @@ -112,6 +112,28 @@ export class BullManager { }) } + private resolveQueueNames(queuesNames?: string[]): string[] { + return queuesNames ?? Array.from(this.queues.keys()) + } + + async clearBulk(queuesNames?: string[]) { + const queues = this.resolveQueueNames(queuesNames) + + // Add the rest of the clearBulk logic here... + + for (const queueName of queues) { + await this.clear(queueName) + } + } + + async processBulk(queuesNames?: string[]) { + const queues = this.resolveQueueNames(queuesNames) + + for (const queueName of queues) { + await this.process({ queueName }) + } + } + list() { return this.queues } @@ -134,23 +156,21 @@ export class BullManager { const h3Router = createRouter() - const bullQueue = await this.app.container.make('bull_queue') + let queues = [...this.list().values()] - const queues = [...bullQueue.list().values()].map((q) => new BullAdapter(q)) + if (queue) { + queues = queues.filter((q) => queue.includes(q.name)) + } await createBullBoard({ - queues, + queues: queues.map((q) => new BullAdapter(q)), serverAdapter, }) app.use(h3Router) app.use(serverAdapter.registerHandlers()) - for (const q of queue) { - await this.process({ - queueName: q, - }) - } + await this.processBulk(queue || this.queues.keys()) await createServer(toNodeListener(app)).listen(port) this.logger.info(`BullBoard started on port :${port}`)