diff --git a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts index abc91c6..58558a0 100644 --- a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts +++ b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts @@ -1,4 +1,5 @@ import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jobs-common' +import { PromisePool } from '@supercharge/promise-pool' import type { Redis } from 'ioredis' export type QueueDiscoverer = { @@ -10,19 +11,36 @@ type RedisQueue = { queueName: string } -export class RedisBasedQueueDiscoverer implements QueueDiscoverer { - constructor( - private readonly redisInstances: Redis[], - private readonly queuesPrefix: string, - ) {} +const QUEUE_DISCOVERY_CONCURRENCY = 3 + +export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscoverer { + constructor(protected readonly redisInstances: Redis[]) {} async discoverQueues(): Promise { - return Promise.all( - this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)), - ).then((queues) => queues.flat()) + const { results, errors } = await PromisePool.withConcurrency(QUEUE_DISCOVERY_CONCURRENCY) + .for(this.redisInstances) + .process((redisInstance) => this.discoverQueuesForInstance(redisInstance)) + + if (errors.length > 0) { + // Throwing first error that was encountered + throw errors[0] + } + + return results.flat() + } + + protected abstract discoverQueuesForInstance(redisInstance: Redis): Promise +} + +export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer { + constructor( + redisInstances: Redis[], + private readonly queuesPrefix: string, + ) { + super(redisInstances) } - private async discoverQueuesForInstance(redisInstance: Redis): Promise { + protected async discoverQueuesForInstance(redisInstance: Redis): Promise { const scanStream = redisInstance.scanStream({ match: `${this.queuesPrefix}:*:meta`, }) @@ -45,16 +63,8 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer { } } -export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer { - constructor(private readonly redisInstances: Redis[]) {} - - async discoverQueues(): Promise { - return Promise.all( - this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)), - ).then((queues) => queues.flat()) - } - - private async discoverQueuesForInstance(redisInstance: Redis): Promise { +export class BackgroundJobsBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer { + protected async discoverQueuesForInstance(redisInstance: Redis): Promise { return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) => queueNames.map((queueName) => ({ redisInstance,