Skip to content

Commit

Permalink
feat: allow bull manager to read other queues when user want to start…
Browse files Browse the repository at this point in the history
… with app, without prior specification
  • Loading branch information
Acidiney Dias committed Feb 15, 2024
1 parent e79d3aa commit de12c97
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 39 deletions.
7 changes: 7 additions & 0 deletions commands/make_job.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/**
* @acidiney/bull-queue
*
* @license MIT
* @copyright Romain Lanz <[email protected]>
*/

import { CommandOptions } from '@adonisjs/core/types/ace'
import { stubsRoot } from '../stubs/main.js'
import { BaseCommand, args } from '@adonisjs/core/ace'
Expand Down
20 changes: 8 additions & 12 deletions commands/queue_clear.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { configProvider } from '@adonisjs/core'
/**
* @acidiney/bull-queue
*
* @license MIT
* @copyright Romain Lanz <[email protected]>
*/

import { BaseCommand, flags } from '@adonisjs/core/ace'
import { CommandOptions } from '@adonisjs/core/types/ace'

Expand All @@ -16,17 +22,7 @@ export default class QueueListener extends BaseCommand {

async run() {
const Queue = await this.app.container.make('bull_queue')
const queueConfigProvider = await this.app.config.get('queue')
const config = await configProvider.resolve<any>(this.app, queueConfigProvider)

if (!this.queue || this.queue.length === 0) this.queue = config.queueNames

await Promise.all(
this.queue.map(async (queue) => {
await Queue.clear(queue)
})
)

return
await Queue.clearBulk(this.queue)
}
}
15 changes: 3 additions & 12 deletions commands/queue_listener.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/**
* @rlanz/bull-queue
* @acidiney/bull-queue
*
* @license MIT
* @copyright Romain Lanz <[email protected]>
* @copyright Romain Lanz <[email protected]>
*/

import { configProvider } from '@adonisjs/core'
import { BaseCommand, flags } from '@adonisjs/core/ace'
import { CommandOptions } from '@adonisjs/core/types/ace'

Expand All @@ -23,18 +22,10 @@ export default class QueueListener extends BaseCommand {

async run() {
const Queue = await this.app.container.make('bull_queue')
const queueConfigProvider = await this.app.config.get('queue')
const config = await configProvider.resolve<any>(this.app, queueConfigProvider)

const router = await this.app.container.make('router')
router.commit()

if (!this.queue || this.queue.length === 0) this.queue = config.queueNames

this.queue.map((q) =>
Queue.process({
queueName: q,
})
)
Queue.processBulk(this.queue)
}
}
9 changes: 2 additions & 7 deletions commands/queue_listener_ui.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/**
* @rlanz/bull-queue
* @acidiney/bull-queue
*
* @license MIT
* @copyright Romain Lanz <[email protected]>
* @copyright Romain Lanz <[email protected]>
*/

import { configProvider } from '@adonisjs/core'
import { BaseCommand, flags } from '@adonisjs/core/ace'
import { CommandOptions } from '@adonisjs/core/types/ace'

Expand All @@ -26,14 +25,10 @@ export default class QueueListener extends BaseCommand {

async run() {
const Queue = await this.app.container.make('bull_queue')
const queueConfigProvider = await this.app.config.get('queue')
const config = await configProvider.resolve<any>(this.app, queueConfigProvider)

const router = await this.app.container.make('router')
router.commit()

if (!this.queue || this.queue.length === 0) this.queue = config.queueNames

await Queue.ui(this.port || 9999, this.queue)
}
}
36 changes: 28 additions & 8 deletions src/bull_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,28 @@ export class BullManager {
})
}

private resolveQueueNames(queuesNames?: string[]): string[] {
return queuesNames ?? Array.from(this.queues.keys())
}

async clearBulk(queuesNames?: string[]) {
const queues = this.resolveQueueNames(queuesNames)

// Add the rest of the clearBulk logic here...

for (const queueName of queues) {
await this.clear(queueName)
}
}

async processBulk(queuesNames?: string[]) {
const queues = this.resolveQueueNames(queuesNames)

for (const queueName of queues) {
await this.process({ queueName })
}
}

list() {
return this.queues
}
Expand All @@ -134,23 +156,21 @@ export class BullManager {

const h3Router = createRouter()

const bullQueue = await this.app.container.make('bull_queue')
let queues = [...this.list().values()]

const queues = [...bullQueue.list().values()].map((q) => new BullAdapter(q))
if (queue) {
queues = queues.filter((q) => queue.includes(q.name))
}

await createBullBoard({
queues,
queues: queues.map((q) => new BullAdapter(q)),
serverAdapter,
})

app.use(h3Router)
app.use(serverAdapter.registerHandlers())

for (const q of queue) {
await this.process({
queueName: q,
})
}
await this.processBulk(queue || this.queues.keys())

await createServer(toNodeListener(app)).listen(port)
this.logger.info(`BullBoard started on port :${port}`)
Expand Down

0 comments on commit de12c97

Please sign in to comment.