Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-4397 support for multiple redis clients in BullMQ metrics #186

Merged
merged 5 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ This plugin depends on the following peer-installed packages:

Add the plugin to your Fastify instance by registering it with the following possible options:

- `redisClient`, a Redis client instance which is used by the BullMQ: plugin uses it to discover the queues;
- `redisClient`, a Redis client instance(s) which is used by the BullMQ: plugin uses it to discover the queues. Both single instance or array of instances can be provided;
- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis;
- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus;
- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
- `queueDiscoverers` (optional, default: `BackgroundJobsBasedQueueDiscoverer` for each redis instance). The queue discoverers to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
- `excludedQueues` (optional, default: `[]`). An array of queue names to exclude from metrics collection;
- `histogramBuckets` (optional, default: `[20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000]`). Buckets for the histogram metrics (such as job completion or overall processing time).
- `collectionOptions` (optional, default: `{ type: 'interval', intervalInMs: 5000 }`). Allows to configure how metrics are collected. Supports the following properties:
Expand Down
36 changes: 27 additions & 9 deletions lib/plugins/bull-mq-metrics/MetricsCollector.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { PromisePool } from '@supercharge/promise-pool'
import type { FastifyBaseLogger } from 'fastify'
import type { Redis } from 'ioredis'
import * as prometheus from 'prom-client'

import { ObservableQueue } from './ObservableQueue'
Expand All @@ -13,10 +12,9 @@ export type Metrics = {
}

export type MetricCollectorOptions = {
redisClient: Redis
bullMqPrefix: string
metricsPrefix: string
queueDiscoverer: QueueDiscoverer
queueDiscoverers: QueueDiscoverer[]
excludedQueues: string[]
histogramBuckets: number[]
}
Expand All @@ -42,7 +40,6 @@ const getMetrics = (prefix: string, histogramBuckets: number[]): Metrics => ({
})

export class MetricsCollector {
private readonly redis: Redis
private readonly metrics: Metrics
private observedQueues: ObservableQueue[] | undefined

Expand All @@ -51,7 +48,6 @@ export class MetricsCollector {
private readonly registry: prometheus.Registry,
private readonly logger: FastifyBaseLogger,
) {
this.redis = options.redisClient
this.metrics = this.registerMetrics(this.registry, this.options)
}

Expand All @@ -60,12 +56,10 @@ export class MetricsCollector {
*/
async collect() {
if (!this.observedQueues) {
this.observedQueues = (await this.options.queueDiscoverer.discoverQueues())
.filter((name) => !this.options.excludedQueues.includes(name))
.map((name) => new ObservableQueue(name, this.redis, this.metrics, this.logger))
this.observedQueues = await this.discoverQueues()
}

await PromisePool.for(this.observedQueues).process((queue) => {
await PromisePool.for(this.observedQueues).process((queue: ObservableQueue) => {
queue.collect()
})
}
Expand All @@ -79,6 +73,30 @@ export class MetricsCollector {
}
}

private async discoverQueues(): Promise<ObservableQueue[]> {
kjamrog marked this conversation as resolved.
Show resolved Hide resolved
const redisInstancesWithQueues = await Promise.all(
this.options.queueDiscoverers.map((discoverer) => discoverer.getRedisInstanceWithQueues()),
)

return redisInstancesWithQueues
.flatMap((redisWithQueues) =>
redisWithQueues.queues.map((queueName) => ({
redis: redisWithQueues.redisInstance,
queueName,
})),
)
.filter((redisWithQueue) => !this.options.excludedQueues.includes(redisWithQueue.queueName))
.map(
(redisWithQueue) =>
new ObservableQueue(
redisWithQueue.queueName,
redisWithQueue.redis,
this.metrics,
this.logger,
),
)
}

private registerMetrics(
registry: prometheus.Registry,
{ metricsPrefix, histogramBuckets }: MetricCollectorOptions,
Expand Down
20 changes: 20 additions & 0 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jo
import type { Redis } from 'ioredis'

export type QueueDiscoverer = {
getRedisInstanceWithQueues: () => Promise<RedisInstanceWithQueues>
kjamrog marked this conversation as resolved.
Show resolved Hide resolved
discoverQueues: () => Promise<string[]>
}

type RedisInstanceWithQueues = {
redisInstance: Redis
queues: string[]
}

export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(
private readonly redis: Redis,
private readonly queuesPrefix: string,
) {}

async getRedisInstanceWithQueues(): Promise<RedisInstanceWithQueues> {
return {
redisInstance: this.redis,
queues: await this.discoverQueues(),
}
}

async discoverQueues(): Promise<string[]> {
const scanStream = this.redis.scanStream({
match: `${this.queuesPrefix}:*:meta`,
Expand All @@ -32,6 +45,13 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer {
constructor(private readonly redis: Redis) {}

async getRedisInstanceWithQueues(): Promise<RedisInstanceWithQueues> {
return {
redisInstance: this.redis,
queues: await this.discoverQueues(),
}
}

async discoverQueues(): Promise<string[]> {
return await backgroundJobProcessorGetActiveQueueIds(this.redis)
}
Expand Down
39 changes: 38 additions & 1 deletion lib/plugins/bullMqMetricsPlugin.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ async function initAppWithBullMqMetrics(
})
}

const redisClients = Array.isArray(pluginOptions.redisClient)
? pluginOptions.redisClient
: [pluginOptions.redisClient]

await app.register(bullMqMetricsPlugin, {
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClient, 'bull'),
queueDiscoverers: redisClients.map((redis) => new RedisBasedQueueDiscoverer(redis, 'bull')),
collectionOptions: {
type: 'interval',
intervalInMs: 50,
Expand Down Expand Up @@ -138,4 +142,37 @@ describe('bullMqMetricsPlugin', () => {
'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 1',
)
})

it('works with multiple redis clients', async () => {
app = await initAppWithBullMqMetrics({
redisClient: [redis, redis],
collectionOptions: {
type: 'manual',
},
})

// exec collect to start listening for failed and completed events
await app.bullMqMetrics.collect()

const responseBefore = await getMetrics()
expect(responseBefore.result.body).not.toContain(
'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 1',
kjamrog marked this conversation as resolved.
Show resolved Hide resolved
)

await processor.schedule({
metadata: {
correlationId: 'test',
},
})

await setTimeout(100)

await app.bullMqMetrics.collect()

const responseAfter = await getMetrics()
expect(responseAfter.result.body).toContain(
// value is 2 since we are counting same redis client twice (only for tests)
'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 2',
)
})
})
14 changes: 11 additions & 3 deletions lib/plugins/bullMqMetricsPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ declare module 'fastify' {
}

export type BullMqMetricsPluginOptions = {
redisClient: Redis
redisClient: Redis | Redis[]
collectionOptions?:
| {
type: 'interval'
Expand All @@ -43,18 +43,26 @@ function plugin(
)
}

const redisInstances: Redis[] = Array.isArray(pluginOptions.redisClient)
? pluginOptions.redisClient
: [pluginOptions.redisClient]

const queueDiscoverers = redisInstances.map(
(redis) => new BackgroundJobsBasedQueueDiscoverer(redis),
)

const options = {
bullMqPrefix: 'bull',
metricsPrefix: 'bullmq',
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClient),
queueDiscoverers,
excludedQueues: [],
histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000],
collectionOptions: {
type: 'interval',
intervalInMs: 5000,
},
...pluginOptions,
}
} satisfies MetricCollectorOptions

try {
const collector = new MetricsCollector(options, fastify.metrics.client.register, fastify.log)
Expand Down
15 changes: 2 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,9 @@
"type": "git",
"url": "git://github.com/lokalise/fastify-extras.git"
},
"keywords": [
"fastify",
"newrelic",
"bugsnag",
"request-context",
"request-id",
"split-io"
],
"keywords": ["fastify", "newrelic", "bugsnag", "request-context", "request-id", "split-io"],
"homepage": "https://github.com/lokalise/fastify-extras",
"files": [
"dist/**",
"LICENSE",
"README.md"
],
"files": ["dist/**", "LICENSE", "README.md"],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"type": "commonjs",
Expand Down
Loading