Skip to content

Commit

Permalink
AIM-402 Extract queue discovery logic, add background-jobs-common dis…
Browse files Browse the repository at this point in the history
…coverer, check for registered metrics before attempting to register them.
  • Loading branch information
drdaemos committed Jul 17, 2024
1 parent b740385 commit 99b1b6a
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 60 deletions.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ The following needs to be taken into consideration when adding new runtime depen
- `@fastify/jwt`;
- `fastify`;
- `newrelic`;
- `pino`.
- `pino`;
- `bullmq`;

## Plugins

Expand Down Expand Up @@ -132,6 +133,27 @@ Add the plugin to your Fastify instance by registering it with the following opt
The plugin exposes a `GET /metrics` route in your Fastify app to retrieve Prometheus metrics. If something goes wrong while starting the Prometheus metrics server, an `Error` is thrown. Otherwise, a success message is displayed when the plugin has been loaded.
### BullMQ Netrics Plugin
Plugin to auto-discover BullMQ queues which regularly collects metrics for them and exposes them via `fastify-metrics` global Prometheus registry. If used together with `metricsPlugin`, it will show these metrics on `GET /metrics` route.
This plugin depends on the following peer-installed packages:
- `bullmq`
- `ioredis`
Add the plugin to your Fastify instance by registering it with the following options:
- `redisClient`, a Redis client instance which is used by the BullMQ: plugin uses it to discover the queues;
- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis;
- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus;
- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
- `excludedQueues` (optional, default: `[]`). An array of queue names to exclude from metrics collection;
- `collectionIntervalInMs` (optional, default: `5000`). The interval in milliseconds at which the metrics are collected;
- `histogramBuckets` (optional, default: `[20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000]`). Buckets for the histogram metrics (such as job completion or overall processing time).
If something goes wrong while starting the BullMQ metrics plugin, an `Error` is thrown.
### NewRelic Transaction Manager Plugin
Plugin to create custom NewRelic spans for background jobs.
Expand Down
5 changes: 5 additions & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export type {
HealthcheckMetricsPluginOptions,
} from './plugins/healthcheck/healthcheckMetricsPlugin'

export { bullMqMetricsPlugin } from './plugins/bullMqMetricsPlugin'
export type { BullMqMetricsPluginOptions } from './plugins/bullMqMetricsPlugin'
export { RedisBasedQueueDiscoverer, BackgroundJobsBasedQueueDiscoverer } from './plugins/bull-mq-metrics/queueDiscoverers'
export type { QueueDiscoverer } from './plugins/bull-mq-metrics/queueDiscoverers'

export { metricsPlugin } from './plugins/metricsPlugin'
export type { ErrorObjectResolver, MetricsPluginOptions } from './plugins/metricsPlugin'

Expand Down
119 changes: 60 additions & 59 deletions lib/plugins/bull-mq-metrics/MetricsCollector.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { setTimeout } from 'node:timers/promises'

import { Queue, QueueEvents } from 'bullmq'
import type { Redis } from 'ioredis'
import * as prometheus from 'prom-client'

import type { BullMqMetricsPluginOptions } from '../bullMqMetricsPlugin'

Check warning on line 7 in lib/plugins/bull-mq-metrics/MetricsCollector.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

There should be at least one empty line between import groups

Check warning on line 7 in lib/plugins/bull-mq-metrics/MetricsCollector.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

There should be at least one empty line between import groups

Check warning on line 7 in lib/plugins/bull-mq-metrics/MetricsCollector.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

There should be at least one empty line between import groups
import { BackgroundJobsBasedQueueDiscoverer } from './queueDiscoverers'

type Metrics = {
completedGauge: prometheus.Gauge<never>
Expand All @@ -16,6 +19,46 @@ type Metrics = {

type MetricCollectorOptions = Required<Omit<BullMqMetricsPluginOptions, 'redisClient'>>

const getMetrics = (prefix: string, histogramBuckets: number[]): Metrics => ({
completedGauge: new prometheus.Gauge({
name: `${prefix}_jobs_completed`,
help: 'Total number of completed jobs',
labelNames: ['queue'],
}),
activeGauge: new prometheus.Gauge({
name: `${prefix}_jobs_active`,
help: 'Total number of active jobs (currently being processed)',
labelNames: ['queue'],
}),
failedGauge: new prometheus.Gauge({
name: `${prefix}_jobs_failed`,
help: 'Total number of failed jobs',
labelNames: ['queue'],
}),
delayedGauge: new prometheus.Gauge({
name: `${prefix}_jobs_delayed`,
help: 'Total number of jobs that will run in the future',
labelNames: ['queue'],
}),
waitingGauge: new prometheus.Gauge({
name: `${prefix}_jobs_waiting`,
help: 'Total number of jobs waiting to be processed',
labelNames: ['queue'],
}),
processedDuration: new prometheus.Histogram({
name: `${prefix}_jobs_processed_duration`,
help: 'Processing time for completed jobs (processing until completed)',
buckets: histogramBuckets,
labelNames: ['queue'],
}),
completedDuration: new prometheus.Histogram({
name: `${prefix}_jobs_completed_duration`,
help: 'Completion time for jobs (created until completed)',
buckets: histogramBuckets,
labelNames: ['queue'],
}),
})

export class MetricsCollector {
private readonly options: MetricCollectorOptions
private readonly metrics: Metrics
Expand All @@ -32,14 +75,15 @@ export class MetricsCollector {
excludedQueues: [],
collectionIntervalInMs: 5000,
histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000],
queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(this.redis),
...options,
}

this.metrics = this.registerMetrics(this.registry, this.options)
}

async start() {
const queueNames = await this.findQueues(this.redis, this.options)
const queueNames = await this.options.queueDiscoverer.discoverQueues()
await Promise.all(
queueNames.map((name) => this.observeQueue(name, this.redis, this.metrics, this.options)),
)
Expand All @@ -53,44 +97,20 @@ export class MetricsCollector {
registry: prometheus.Registry,
{ metricsPrefix, histogramBuckets }: MetricCollectorOptions,
): Metrics {
const metrics: Metrics = {
completedGauge: new prometheus.Gauge({
name: `${metricsPrefix}_jobs_completed`,
help: 'Total number of completed jobs',
labelNames: ['queue'],
}),
activeGauge: new prometheus.Gauge({
name: `${metricsPrefix}_jobs_active`,
help: 'Total number of active jobs (currently being processed)',
labelNames: ['queue'],
}),
failedGauge: new prometheus.Gauge({
name: `${metricsPrefix}_jobs_failed`,
help: 'Total number of failed jobs',
labelNames: ['queue'],
}),
delayedGauge: new prometheus.Gauge({
name: `${metricsPrefix}_jobs_delayed`,
help: 'Total number of jobs that will run in the future',
labelNames: ['queue'],
}),
waitingGauge: new prometheus.Gauge({
name: `${metricsPrefix}_jobs_waiting`,
help: 'Total number of jobs waiting to be processed',
labelNames: ['queue'],
}),
processedDuration: new prometheus.Histogram({
name: `${metricsPrefix}_jobs_processed_duration`,
help: 'Processing time for completed jobs (processing until completed)',
buckets: histogramBuckets,
labelNames: ['queue'],
}),
completedDuration: new prometheus.Histogram({
name: `${metricsPrefix}_jobs_completed_duration`,
help: 'Completion time for jobs (created until completed)',
buckets: histogramBuckets,
labelNames: ['queue'],
}),
const metrics = getMetrics(metricsPrefix, histogramBuckets)

// If metrics are already registered, just return them to avoid triggering a Prometheus error
if (registry.getSingleMetric(Object.keys(metrics).pop()!)) {

Check warning on line 103 in lib/plugins/bull-mq-metrics/MetricsCollector.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

Forbidden non-null assertion

Check warning on line 103 in lib/plugins/bull-mq-metrics/MetricsCollector.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Forbidden non-null assertion

Check warning on line 103 in lib/plugins/bull-mq-metrics/MetricsCollector.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Forbidden non-null assertion
const retrievedMetrics = registry.getMetricsAsArray()
const returnValue: Record<string, prometheus.MetricObject> = {}

for (const metric of retrievedMetrics) {
if (Object.keys(metrics).includes(metric.name)) {
returnValue[metric.name as keyof Metrics] = metric
}
}

return returnValue as unknown as Metrics
}

for (const metric of Object.values(metrics)) {
Expand All @@ -100,25 +120,6 @@ export class MetricsCollector {
return metrics
}

private async findQueues(
redis: Redis,
{ bullMqPrefix }: MetricCollectorOptions,
): Promise<string[]> {
const scanStream = redis.scanStream({
match: `${bullMqPrefix}:*:meta`,
})

const queues = new Set<string>()
for await (const chunk of scanStream) {
;(chunk as string[])
.map((key) => key.split(':')[1])
.filter((value) => !!value)
.forEach((queue) => queues.add(queue))
}

return Array.from(queues)
}

private async observeQueue(
name: string,
redis: Redis,
Expand Down Expand Up @@ -155,7 +156,7 @@ export class MetricsCollector {
metrics.failedGauge.labels({ queue: name }).set(failed)
metrics.waitingGauge.labels({ queue: name }).set(waiting)

await new Promise((resolve) => setTimeout(resolve, collectionIntervalInMs))
await setTimeout(collectionIntervalInMs)
}

await events.close()
Expand Down
34 changes: 34 additions & 0 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Redis } from 'ioredis'

Check warning on line 1 in lib/plugins/bull-mq-metrics/queueDiscoverers.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

All imports in the declaration are only used as types. Use `import type`

Check warning on line 1 in lib/plugins/bull-mq-metrics/queueDiscoverers.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

All imports in the declaration are only used as types. Use `import type`

Check warning on line 1 in lib/plugins/bull-mq-metrics/queueDiscoverers.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

All imports in the declaration are only used as types. Use `import type`
import { AbstractBackgroundJobProcessor } from '@lokalise/background-jobs-common'

Check warning on line 2 in lib/plugins/bull-mq-metrics/queueDiscoverers.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

`@lokalise/background-jobs-common` import should occur before import of `ioredis`

Check warning on line 2 in lib/plugins/bull-mq-metrics/queueDiscoverers.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

`@lokalise/background-jobs-common` import should occur before import of `ioredis`

Check warning on line 2 in lib/plugins/bull-mq-metrics/queueDiscoverers.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

`@lokalise/background-jobs-common` import should occur before import of `ioredis`

export type QueueDiscoverer = {
discoverQueues: () => Promise<string[]>
}

export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(private readonly redis: Redis, private readonly queuesPrefix: string) {}

async discoverQueues(): Promise<string[]> {
const scanStream = this.redis.scanStream({
match: `${this.queuesPrefix}:*:meta`,
})

const queues = new Set<string>()
for await (const chunk of scanStream) {
(chunk as string[])
.map((key) => key.split(':')[1])
.filter((value) => !!value)
.forEach((queue) => queues.add(queue))
}

return Array.from(queues).sort()
}
}

export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer {
constructor(private readonly redis: Redis) {}

async discoverQueues(): Promise<string[]> {
return await AbstractBackgroundJobProcessor.getActiveQueueIds(this.redis)
}
}
2 changes: 2 additions & 0 deletions lib/plugins/bullMqMetricsPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import fp from 'fastify-plugin'
import type { Redis } from 'ioredis'

import { MetricsCollector } from './bull-mq-metrics/MetricsCollector'
import { QueueDiscoverer } from './bull-mq-metrics/queueDiscoverers'

Check warning on line 7 in lib/plugins/bullMqMetricsPlugin.ts

View workflow job for this annotation

GitHub Actions / build (18.x)

All imports in the declaration are only used as types. Use `import type`

Check warning on line 7 in lib/plugins/bullMqMetricsPlugin.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

All imports in the declaration are only used as types. Use `import type`

Check warning on line 7 in lib/plugins/bullMqMetricsPlugin.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

All imports in the declaration are only used as types. Use `import type`

export type BullMqMetricsPluginOptions = {
redisClient: Redis
bullMqPrefix?: string
metricsPrefix?: string
queueDiscoverer?: QueueDiscoverer
excludedQueues?: string[]
collectionIntervalInMs?: number
histogramBuckets?: number[]
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"dependencies": {
"@bugsnag/js": "^7.22.7",
"@lokalise/error-utils": "^1.4.0",
"@lokalise/background-jobs-common": "^4.0.1",
"@splitsoftware/splitio": "^10.25.2",
"@amplitude/analytics-node": "^1.3.5",
"fastify-metrics": "^11.0.0",
Expand Down

0 comments on commit 99b1b6a

Please sign in to comment.