From c97c7ef77ec18d21ed3680ad68b5d2fd6ae2efc8 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 20 Aug 2024 13:59:40 +0200 Subject: [PATCH 1/5] AP-4397 support for multiple redis clients in bullmq metrics --- .../bull-mq-metrics/MetricsCollector.ts | 36 ++++++++++++----- .../bull-mq-metrics/queueDiscoverers.ts | 20 ++++++++++ lib/plugins/bullMqMetricsPlugin.spec.ts | 39 ++++++++++++++++++- lib/plugins/bullMqMetricsPlugin.ts | 14 +++++-- package.json | 15 +------ 5 files changed, 98 insertions(+), 26 deletions(-) diff --git a/lib/plugins/bull-mq-metrics/MetricsCollector.ts b/lib/plugins/bull-mq-metrics/MetricsCollector.ts index 78259bb..af08813 100644 --- a/lib/plugins/bull-mq-metrics/MetricsCollector.ts +++ b/lib/plugins/bull-mq-metrics/MetricsCollector.ts @@ -1,6 +1,5 @@ import { PromisePool } from '@supercharge/promise-pool' import type { FastifyBaseLogger } from 'fastify' -import type { Redis } from 'ioredis' import * as prometheus from 'prom-client' import { ObservableQueue } from './ObservableQueue' @@ -13,10 +12,9 @@ export type Metrics = { } export type MetricCollectorOptions = { - redisClient: Redis bullMqPrefix: string metricsPrefix: string - queueDiscoverer: QueueDiscoverer + queueDiscoverers: QueueDiscoverer[] excludedQueues: string[] histogramBuckets: number[] } @@ -42,7 +40,6 @@ const getMetrics = (prefix: string, histogramBuckets: number[]): Metrics => ({ }) export class MetricsCollector { - private readonly redis: Redis private readonly metrics: Metrics private observedQueues: ObservableQueue[] | undefined @@ -51,7 +48,6 @@ export class MetricsCollector { private readonly registry: prometheus.Registry, private readonly logger: FastifyBaseLogger, ) { - this.redis = options.redisClient this.metrics = this.registerMetrics(this.registry, this.options) } @@ -60,12 +56,10 @@ export class MetricsCollector { */ async collect() { if (!this.observedQueues) { - this.observedQueues = (await this.options.queueDiscoverer.discoverQueues()) - .filter((name) => !this.options.excludedQueues.includes(name)) - .map((name) => new ObservableQueue(name, this.redis, this.metrics, this.logger)) + this.observedQueues = await this.discoverQueues() } - await PromisePool.for(this.observedQueues).process((queue) => { + await PromisePool.for(this.observedQueues).process((queue: ObservableQueue) => { queue.collect() }) } @@ -79,6 +73,30 @@ export class MetricsCollector { } } + private async discoverQueues(): Promise { + const redisInstancesWithQueues = await Promise.all( + this.options.queueDiscoverers.map((discoverer) => discoverer.getRedisInstanceWithQueues()), + ) + + return redisInstancesWithQueues + .flatMap((redisWithQueues) => + redisWithQueues.queues.map((queueName) => ({ + redis: redisWithQueues.redisInstance, + queueName, + })), + ) + .filter((redisWithQueue) => !this.options.excludedQueues.includes(redisWithQueue.queueName)) + .map( + (redisWithQueue) => + new ObservableQueue( + redisWithQueue.queueName, + redisWithQueue.redis, + this.metrics, + this.logger, + ), + ) + } + private registerMetrics( registry: prometheus.Registry, { metricsPrefix, histogramBuckets }: MetricCollectorOptions, diff --git a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts index fd0bffc..6151dc4 100644 --- a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts +++ b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts @@ -2,15 +2,28 @@ import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jo import type { Redis } from 'ioredis' export type QueueDiscoverer = { + getRedisInstanceWithQueues: () => Promise discoverQueues: () => Promise } +type RedisInstanceWithQueues = { + redisInstance: Redis + queues: string[] +} + export class RedisBasedQueueDiscoverer implements QueueDiscoverer { constructor( private readonly redis: Redis, private readonly queuesPrefix: string, ) {} + async getRedisInstanceWithQueues(): Promise { + return { + redisInstance: this.redis, + queues: await this.discoverQueues(), + } + } + async discoverQueues(): Promise { const scanStream = this.redis.scanStream({ match: `${this.queuesPrefix}:*:meta`, @@ -32,6 +45,13 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer { export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer { constructor(private readonly redis: Redis) {} + async getRedisInstanceWithQueues(): Promise { + return { + redisInstance: this.redis, + queues: await this.discoverQueues(), + } + } + async discoverQueues(): Promise { return await backgroundJobProcessorGetActiveQueueIds(this.redis) } diff --git a/lib/plugins/bullMqMetricsPlugin.spec.ts b/lib/plugins/bullMqMetricsPlugin.spec.ts index d5b83a2..94bb7e9 100644 --- a/lib/plugins/bullMqMetricsPlugin.spec.ts +++ b/lib/plugins/bullMqMetricsPlugin.spec.ts @@ -41,8 +41,12 @@ async function initAppWithBullMqMetrics( }) } + const redisClients = Array.isArray(pluginOptions.redisClient) + ? pluginOptions.redisClient + : [pluginOptions.redisClient] + await app.register(bullMqMetricsPlugin, { - queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClient, 'bull'), + queueDiscoverers: redisClients.map((redis) => new RedisBasedQueueDiscoverer(redis, 'bull')), collectionOptions: { type: 'interval', intervalInMs: 50, @@ -138,4 +142,37 @@ describe('bullMqMetricsPlugin', () => { 'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 1', ) }) + + it('works with multiple redis clients', async () => { + app = await initAppWithBullMqMetrics({ + redisClient: [redis, redis], + collectionOptions: { + type: 'manual', + }, + }) + + // exec collect to start listening for failed and completed events + await app.bullMqMetrics.collect() + + const responseBefore = await getMetrics() + expect(responseBefore.result.body).not.toContain( + 'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 1', + ) + + await processor.schedule({ + metadata: { + correlationId: 'test', + }, + }) + + await setTimeout(100) + + await app.bullMqMetrics.collect() + + const responseAfter = await getMetrics() + expect(responseAfter.result.body).toContain( + // value is 2 since we are counting same redis client twice (only for tests) + 'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 2', + ) + }) }) diff --git a/lib/plugins/bullMqMetricsPlugin.ts b/lib/plugins/bullMqMetricsPlugin.ts index 7130e79..1f98803 100644 --- a/lib/plugins/bullMqMetricsPlugin.ts +++ b/lib/plugins/bullMqMetricsPlugin.ts @@ -19,7 +19,7 @@ declare module 'fastify' { } export type BullMqMetricsPluginOptions = { - redisClient: Redis + redisClient: Redis | Redis[] collectionOptions?: | { type: 'interval' @@ -43,10 +43,18 @@ function plugin( ) } + const redisInstances: Redis[] = Array.isArray(pluginOptions.redisClient) + ? pluginOptions.redisClient + : [pluginOptions.redisClient] + + const queueDiscoverers = redisInstances.map( + (redis) => new BackgroundJobsBasedQueueDiscoverer(redis), + ) + const options = { bullMqPrefix: 'bull', metricsPrefix: 'bullmq', - queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClient), + queueDiscoverers, excludedQueues: [], histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000], collectionOptions: { @@ -54,7 +62,7 @@ function plugin( intervalInMs: 5000, }, ...pluginOptions, - } + } satisfies MetricCollectorOptions try { const collector = new MetricsCollector(options, fastify.metrics.client.register, fastify.log) diff --git a/package.json b/package.json index 4848275..101f958 100644 --- a/package.json +++ b/package.json @@ -11,20 +11,9 @@ "type": "git", "url": "git://github.com/lokalise/fastify-extras.git" }, - "keywords": [ - "fastify", - "newrelic", - "bugsnag", - "request-context", - "request-id", - "split-io" - ], + "keywords": ["fastify", "newrelic", "bugsnag", "request-context", "request-id", "split-io"], "homepage": "https://github.com/lokalise/fastify-extras", - "files": [ - "dist/**", - "LICENSE", - "README.md" - ], + "files": ["dist/**", "LICENSE", "README.md"], "main": "dist/index.js", "types": "dist/index.d.ts", "type": "commonjs", From d34b2feab8d0eed22ed05bf978cd2df207af3da5 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 20 Aug 2024 16:28:49 +0200 Subject: [PATCH 2/5] AP-4397 updated README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cba9a9b..6acdd91 100644 --- a/README.md +++ b/README.md @@ -145,10 +145,10 @@ This plugin depends on the following peer-installed packages: Add the plugin to your Fastify instance by registering it with the following possible options: -- `redisClient`, a Redis client instance which is used by the BullMQ: plugin uses it to discover the queues; +- `redisClient`, a Redis client instance(s) which is used by the BullMQ: plugin uses it to discover the queues. Both single instance or array of instances can be provided; - `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis; - `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus; -- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface; +- `queueDiscoverers` (optional, default: `BackgroundJobsBasedQueueDiscoverer` for each redis instance). The queue discoverers to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface; - `excludedQueues` (optional, default: `[]`). An array of queue names to exclude from metrics collection; - `histogramBuckets` (optional, default: `[20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000]`). Buckets for the histogram metrics (such as job completion or overall processing time). - `collectionOptions` (optional, default: `{ type: 'interval', intervalInMs: 5000 }`). Allows to configure how metrics are collected. Supports the following properties: From 908345bd3837b2461d6724565122f9a959d9300a Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 20 Aug 2024 16:38:56 +0200 Subject: [PATCH 3/5] AP-4397 linting --- lib/plugins/bullMqMetricsPlugin.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/plugins/bullMqMetricsPlugin.ts b/lib/plugins/bullMqMetricsPlugin.ts index 1f98803..ac37492 100644 --- a/lib/plugins/bullMqMetricsPlugin.ts +++ b/lib/plugins/bullMqMetricsPlugin.ts @@ -46,8 +46,8 @@ function plugin( const redisInstances: Redis[] = Array.isArray(pluginOptions.redisClient) ? pluginOptions.redisClient : [pluginOptions.redisClient] - - const queueDiscoverers = redisInstances.map( + + const queueDiscoverers = redisInstances.map( (redis) => new BackgroundJobsBasedQueueDiscoverer(redis), ) From 86c734f7df0f9b1e63bc007eeea9a6d228875c2b Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 21 Aug 2024 12:27:09 +0200 Subject: [PATCH 4/5] AP-4397 make discoverer work on multiple redis instances --- README.md | 4 +- .../bull-mq-metrics/MetricsCollector.ts | 33 +++---------- .../bull-mq-metrics/queueDiscoverers.ts | 49 +++++++++++-------- lib/plugins/bullMqMetricsPlugin.spec.ts | 14 ++---- lib/plugins/bullMqMetricsPlugin.ts | 12 +---- 5 files changed, 44 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 6acdd91..84ccd4b 100644 --- a/README.md +++ b/README.md @@ -145,10 +145,10 @@ This plugin depends on the following peer-installed packages: Add the plugin to your Fastify instance by registering it with the following possible options: -- `redisClient`, a Redis client instance(s) which is used by the BullMQ: plugin uses it to discover the queues. Both single instance or array of instances can be provided; +- `redisClients`, a Redis client instances which are used by the BullMQ: plugin uses it to discover the queues. - `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis; - `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus; -- `queueDiscoverers` (optional, default: `BackgroundJobsBasedQueueDiscoverer` for each redis instance). The queue discoverers to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface; +- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface; - `excludedQueues` (optional, default: `[]`). An array of queue names to exclude from metrics collection; - `histogramBuckets` (optional, default: `[20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000]`). Buckets for the histogram metrics (such as job completion or overall processing time). - `collectionOptions` (optional, default: `{ type: 'interval', intervalInMs: 5000 }`). Allows to configure how metrics are collected. Supports the following properties: diff --git a/lib/plugins/bull-mq-metrics/MetricsCollector.ts b/lib/plugins/bull-mq-metrics/MetricsCollector.ts index af08813..87a1eb9 100644 --- a/lib/plugins/bull-mq-metrics/MetricsCollector.ts +++ b/lib/plugins/bull-mq-metrics/MetricsCollector.ts @@ -14,7 +14,7 @@ export type Metrics = { export type MetricCollectorOptions = { bullMqPrefix: string metricsPrefix: string - queueDiscoverers: QueueDiscoverer[] + queueDiscoverer: QueueDiscoverer excludedQueues: string[] histogramBuckets: number[] } @@ -56,7 +56,12 @@ export class MetricsCollector { */ async collect() { if (!this.observedQueues) { - this.observedQueues = await this.discoverQueues() + this.observedQueues = (await this.options.queueDiscoverer.discoverQueues()) + .filter((queue) => !this.options.excludedQueues.includes(queue.queueName)) + .map( + (queue) => + new ObservableQueue(queue.queueName, queue.redisInstance, this.metrics, this.logger), + ) } await PromisePool.for(this.observedQueues).process((queue: ObservableQueue) => { @@ -73,30 +78,6 @@ export class MetricsCollector { } } - private async discoverQueues(): Promise { - const redisInstancesWithQueues = await Promise.all( - this.options.queueDiscoverers.map((discoverer) => discoverer.getRedisInstanceWithQueues()), - ) - - return redisInstancesWithQueues - .flatMap((redisWithQueues) => - redisWithQueues.queues.map((queueName) => ({ - redis: redisWithQueues.redisInstance, - queueName, - })), - ) - .filter((redisWithQueue) => !this.options.excludedQueues.includes(redisWithQueue.queueName)) - .map( - (redisWithQueue) => - new ObservableQueue( - redisWithQueue.queueName, - redisWithQueue.redis, - this.metrics, - this.logger, - ), - ) - } - private registerMetrics( registry: prometheus.Registry, { metricsPrefix, histogramBuckets }: MetricCollectorOptions, diff --git a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts index 6151dc4..abc91c6 100644 --- a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts +++ b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts @@ -2,30 +2,28 @@ import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jo import type { Redis } from 'ioredis' export type QueueDiscoverer = { - getRedisInstanceWithQueues: () => Promise - discoverQueues: () => Promise + discoverQueues: () => Promise } -type RedisInstanceWithQueues = { +type RedisQueue = { redisInstance: Redis - queues: string[] + queueName: string } export class RedisBasedQueueDiscoverer implements QueueDiscoverer { constructor( - private readonly redis: Redis, + private readonly redisInstances: Redis[], private readonly queuesPrefix: string, ) {} - async getRedisInstanceWithQueues(): Promise { - return { - redisInstance: this.redis, - queues: await this.discoverQueues(), - } + async discoverQueues(): Promise { + return Promise.all( + this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)), + ).then((queues) => queues.flat()) } - async discoverQueues(): Promise { - const scanStream = this.redis.scanStream({ + private async discoverQueuesForInstance(redisInstance: Redis): Promise { + const scanStream = redisInstance.scanStream({ match: `${this.queuesPrefix}:*:meta`, }) @@ -38,21 +36,30 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer { .forEach((queue) => queues.add(queue)) } - return Array.from(queues).sort() + return Array.from(queues) + .sort() + .map((queueName) => ({ + redisInstance: redisInstance, + queueName, + })) } } export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer { - constructor(private readonly redis: Redis) {} + constructor(private readonly redisInstances: Redis[]) {} - async getRedisInstanceWithQueues(): Promise { - return { - redisInstance: this.redis, - queues: await this.discoverQueues(), - } + async discoverQueues(): Promise { + return Promise.all( + this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)), + ).then((queues) => queues.flat()) } - async discoverQueues(): Promise { - return await backgroundJobProcessorGetActiveQueueIds(this.redis) + private async discoverQueuesForInstance(redisInstance: Redis): Promise { + return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) => + queueNames.map((queueName) => ({ + redisInstance, + queueName, + })), + ) } } diff --git a/lib/plugins/bullMqMetricsPlugin.spec.ts b/lib/plugins/bullMqMetricsPlugin.spec.ts index 94bb7e9..a45a3bc 100644 --- a/lib/plugins/bullMqMetricsPlugin.spec.ts +++ b/lib/plugins/bullMqMetricsPlugin.spec.ts @@ -41,12 +41,8 @@ async function initAppWithBullMqMetrics( }) } - const redisClients = Array.isArray(pluginOptions.redisClient) - ? pluginOptions.redisClient - : [pluginOptions.redisClient] - await app.register(bullMqMetricsPlugin, { - queueDiscoverers: redisClients.map((redis) => new RedisBasedQueueDiscoverer(redis, 'bull')), + queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClients, 'bull'), collectionOptions: { type: 'interval', intervalInMs: 50, @@ -100,7 +96,7 @@ describe('bullMqMetricsPlugin', () => { await expect(() => { return initAppWithBullMqMetrics( { - redisClient: redis, + redisClients: [redis], }, { enableMetricsPlugin: false, @@ -113,7 +109,7 @@ describe('bullMqMetricsPlugin', () => { it('exposes metrics collect() function', async () => { app = await initAppWithBullMqMetrics({ - redisClient: redis, + redisClients: [redis], collectionOptions: { type: 'manual', }, @@ -145,7 +141,7 @@ describe('bullMqMetricsPlugin', () => { it('works with multiple redis clients', async () => { app = await initAppWithBullMqMetrics({ - redisClient: [redis, redis], + redisClients: [redis, redis], collectionOptions: { type: 'manual', }, @@ -156,7 +152,7 @@ describe('bullMqMetricsPlugin', () => { const responseBefore = await getMetrics() expect(responseBefore.result.body).not.toContain( - 'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 1', + 'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"}', ) await processor.schedule({ diff --git a/lib/plugins/bullMqMetricsPlugin.ts b/lib/plugins/bullMqMetricsPlugin.ts index ac37492..4a980e2 100644 --- a/lib/plugins/bullMqMetricsPlugin.ts +++ b/lib/plugins/bullMqMetricsPlugin.ts @@ -19,7 +19,7 @@ declare module 'fastify' { } export type BullMqMetricsPluginOptions = { - redisClient: Redis | Redis[] + redisClients: Redis[] collectionOptions?: | { type: 'interval' @@ -43,18 +43,10 @@ function plugin( ) } - const redisInstances: Redis[] = Array.isArray(pluginOptions.redisClient) - ? pluginOptions.redisClient - : [pluginOptions.redisClient] - - const queueDiscoverers = redisInstances.map( - (redis) => new BackgroundJobsBasedQueueDiscoverer(redis), - ) - const options = { bullMqPrefix: 'bull', metricsPrefix: 'bullmq', - queueDiscoverers, + queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClients), excludedQueues: [], histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000], collectionOptions: { From a61fbbd8c2d17137caace4e783554f24880745f9 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 22 Aug 2024 11:45:51 +0200 Subject: [PATCH 5/5] AP-4397 use PromisePool for queues discovery --- .../bull-mq-metrics/queueDiscoverers.ts | 48 +++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts index abc91c6..58558a0 100644 --- a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts +++ b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts @@ -1,4 +1,5 @@ import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jobs-common' +import { PromisePool } from '@supercharge/promise-pool' import type { Redis } from 'ioredis' export type QueueDiscoverer = { @@ -10,19 +11,36 @@ type RedisQueue = { queueName: string } -export class RedisBasedQueueDiscoverer implements QueueDiscoverer { - constructor( - private readonly redisInstances: Redis[], - private readonly queuesPrefix: string, - ) {} +const QUEUE_DISCOVERY_CONCURRENCY = 3 + +export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscoverer { + constructor(protected readonly redisInstances: Redis[]) {} async discoverQueues(): Promise { - return Promise.all( - this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)), - ).then((queues) => queues.flat()) + const { results, errors } = await PromisePool.withConcurrency(QUEUE_DISCOVERY_CONCURRENCY) + .for(this.redisInstances) + .process((redisInstance) => this.discoverQueuesForInstance(redisInstance)) + + if (errors.length > 0) { + // Throwing first error that was encountered + throw errors[0] + } + + return results.flat() + } + + protected abstract discoverQueuesForInstance(redisInstance: Redis): Promise +} + +export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer { + constructor( + redisInstances: Redis[], + private readonly queuesPrefix: string, + ) { + super(redisInstances) } - private async discoverQueuesForInstance(redisInstance: Redis): Promise { + protected async discoverQueuesForInstance(redisInstance: Redis): Promise { const scanStream = redisInstance.scanStream({ match: `${this.queuesPrefix}:*:meta`, }) @@ -45,16 +63,8 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer { } } -export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer { - constructor(private readonly redisInstances: Redis[]) {} - - async discoverQueues(): Promise { - return Promise.all( - this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)), - ).then((queues) => queues.flat()) - } - - private async discoverQueuesForInstance(redisInstance: Redis): Promise { +export class BackgroundJobsBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer { + protected async discoverQueuesForInstance(redisInstance: Redis): Promise { return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) => queueNames.map((queueName) => ({ redisInstance,