Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-4397 support for multiple redis clients in BullMQ metrics #186

Merged
merged 5 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ This plugin depends on the following peer-installed packages:

Add the plugin to your Fastify instance by registering it with the following possible options:

- `redisClient`, a Redis client instance which is used by the BullMQ: plugin uses it to discover the queues;
- `redisClients`, a Redis client instances which are used by the BullMQ: plugin uses it to discover the queues.
- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis;
- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus;
- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
Expand Down
13 changes: 6 additions & 7 deletions lib/plugins/bull-mq-metrics/MetricsCollector.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { PromisePool } from '@supercharge/promise-pool'
import type { FastifyBaseLogger } from 'fastify'
import type { Redis } from 'ioredis'
import * as prometheus from 'prom-client'

import { ObservableQueue } from './ObservableQueue'
Expand All @@ -13,7 +12,6 @@ export type Metrics = {
}

export type MetricCollectorOptions = {
redisClient: Redis
bullMqPrefix: string
metricsPrefix: string
queueDiscoverer: QueueDiscoverer
Expand Down Expand Up @@ -42,7 +40,6 @@ const getMetrics = (prefix: string, histogramBuckets: number[]): Metrics => ({
})

export class MetricsCollector {
private readonly redis: Redis
private readonly metrics: Metrics
private observedQueues: ObservableQueue[] | undefined

Expand All @@ -51,7 +48,6 @@ export class MetricsCollector {
private readonly registry: prometheus.Registry,
private readonly logger: FastifyBaseLogger,
) {
this.redis = options.redisClient
this.metrics = this.registerMetrics(this.registry, this.options)
}

Expand All @@ -61,11 +57,14 @@ export class MetricsCollector {
async collect() {
if (!this.observedQueues) {
this.observedQueues = (await this.options.queueDiscoverer.discoverQueues())
.filter((name) => !this.options.excludedQueues.includes(name))
.map((name) => new ObservableQueue(name, this.redis, this.metrics, this.logger))
.filter((queue) => !this.options.excludedQueues.includes(queue.queueName))
.map(
(queue) =>
new ObservableQueue(queue.queueName, queue.redisInstance, this.metrics, this.logger),
)
}

await PromisePool.for(this.observedQueues).process((queue) => {
await PromisePool.for(this.observedQueues).process((queue: ObservableQueue) => {
queue.collect()
})
}
Expand Down
61 changes: 49 additions & 12 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,47 @@
import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jobs-common'
import { PromisePool } from '@supercharge/promise-pool'
import type { Redis } from 'ioredis'

export type QueueDiscoverer = {
discoverQueues: () => Promise<string[]>
discoverQueues: () => Promise<RedisQueue[]>
}

export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
type RedisQueue = {
redisInstance: Redis
queueName: string
}

const QUEUE_DISCOVERY_CONCURRENCY = 3

export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(protected readonly redisInstances: Redis[]) {}

async discoverQueues(): Promise<RedisQueue[]> {
const { results, errors } = await PromisePool.withConcurrency(QUEUE_DISCOVERY_CONCURRENCY)
.for(this.redisInstances)
.process((redisInstance) => this.discoverQueuesForInstance(redisInstance))

if (errors.length > 0) {
// Throwing first error that was encountered
throw errors[0]
}

return results.flat()
}

protected abstract discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]>
}

export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
constructor(
private readonly redis: Redis,
redisInstances: Redis[],
private readonly queuesPrefix: string,
) {}
) {
super(redisInstances)
}

async discoverQueues(): Promise<string[]> {
const scanStream = this.redis.scanStream({
protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
const scanStream = redisInstance.scanStream({
match: `${this.queuesPrefix}:*:meta`,
})

Expand All @@ -25,14 +54,22 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
.forEach((queue) => queues.add(queue))
}

return Array.from(queues).sort()
return Array.from(queues)
.sort()
.map((queueName) => ({
redisInstance: redisInstance,
queueName,
}))
}
}

export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer {
constructor(private readonly redis: Redis) {}

async discoverQueues(): Promise<string[]> {
return await backgroundJobProcessorGetActiveQueueIds(this.redis)
export class BackgroundJobsBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) =>
queueNames.map((queueName) => ({
redisInstance,
queueName,
})),
)
}
}
39 changes: 36 additions & 3 deletions lib/plugins/bullMqMetricsPlugin.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async function initAppWithBullMqMetrics(
}

await app.register(bullMqMetricsPlugin, {
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClient, 'bull'),
queueDiscoverer: new RedisBasedQueueDiscoverer(pluginOptions.redisClients, 'bull'),
collectionOptions: {
type: 'interval',
intervalInMs: 50,
Expand Down Expand Up @@ -96,7 +96,7 @@ describe('bullMqMetricsPlugin', () => {
await expect(() => {
return initAppWithBullMqMetrics(
{
redisClient: redis,
redisClients: [redis],
},
{
enableMetricsPlugin: false,
Expand All @@ -109,7 +109,7 @@ describe('bullMqMetricsPlugin', () => {

it('exposes metrics collect() function', async () => {
app = await initAppWithBullMqMetrics({
redisClient: redis,
redisClients: [redis],
collectionOptions: {
type: 'manual',
},
Expand Down Expand Up @@ -138,4 +138,37 @@ describe('bullMqMetricsPlugin', () => {
'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 1',
)
})

it('works with multiple redis clients', async () => {
app = await initAppWithBullMqMetrics({
redisClients: [redis, redis],
collectionOptions: {
type: 'manual',
},
})

// exec collect to start listening for failed and completed events
await app.bullMqMetrics.collect()

const responseBefore = await getMetrics()
expect(responseBefore.result.body).not.toContain(
'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"}',
)

await processor.schedule({
metadata: {
correlationId: 'test',
},
})

await setTimeout(100)

await app.bullMqMetrics.collect()

const responseAfter = await getMetrics()
expect(responseAfter.result.body).toContain(
// value is 2 since we are counting same redis client twice (only for tests)
'bullmq_jobs_finished_duration_count{status="completed",queue="test_job"} 2',
)
})
})
6 changes: 3 additions & 3 deletions lib/plugins/bullMqMetricsPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ declare module 'fastify' {
}

export type BullMqMetricsPluginOptions = {
redisClient: Redis
redisClients: Redis[]
collectionOptions?:
| {
type: 'interval'
Expand All @@ -46,15 +46,15 @@ function plugin(
const options = {
bullMqPrefix: 'bull',
metricsPrefix: 'bullmq',
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClient),
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(pluginOptions.redisClients),
excludedQueues: [],
histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000],
collectionOptions: {
type: 'interval',
intervalInMs: 5000,
},
...pluginOptions,
}
} satisfies MetricCollectorOptions

try {
const collector = new MetricsCollector(options, fastify.metrics.client.register, fastify.log)
Expand Down
15 changes: 2 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,9 @@
"type": "git",
"url": "git://github.com/lokalise/fastify-extras.git"
},
"keywords": [
"fastify",
"newrelic",
"bugsnag",
"request-context",
"request-id",
"split-io"
],
"keywords": ["fastify", "newrelic", "bugsnag", "request-context", "request-id", "split-io"],
"homepage": "https://github.com/lokalise/fastify-extras",
"files": [
"dist/**",
"LICENSE",
"README.md"
],
"files": ["dist/**", "LICENSE", "README.md"],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"type": "commonjs",
Expand Down
Loading