Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIM-402 Add BullMQ metrics plugin #173

Merged
merged 23 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
REDIS_DB=0
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USERNAME=
REDIS_PASSWORD=sOmE_sEcUrE_pAsS
REDIS_USE_TLS=false
REDIS_CONNECT_TIMEOUT=
REDIS_COMMAND_TIMEOUT=
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"ecmaVersion": 2022,
"sourceType": "module",
"tsconfigRootDir": "./",
"project": ["./tsconfig.json"]
"project": ["./tsconfig.lint.json"]
},
"rules": {
"@typescript-eslint/consistent-type-definitions": "off",
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:

strategy:
matrix:
node-version: [18.x, 20.x, 22.x]
node-version: [20.x, 22.4]
drdaemos marked this conversation as resolved.
Show resolved Hide resolved

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ typings/

# dotenv environment variables file
.env
.env.test

# parcel-bundler cache (https://parceljs.org/)
.cache
Expand Down
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Reusable plugins for Fastify.
- [Split IO Plugin](#split-io-plugin)
- [BugSnag Plugin](#bugsnag-plugin)
- [Metrics Plugin](#metrics-plugin)
- [Bull MQ Metrics Plugin](#bullmq-metrics-plugin)
- [NewRelic Transaction Manager Plugin](#newrelic-transaction-manager-plugin)
- [UnhandledException Plugin](#unhandledexception-plugin)

Expand All @@ -33,7 +34,8 @@ The following needs to be taken into consideration when adding new runtime depen
- `@fastify/jwt`;
- `fastify`;
- `newrelic`;
- `pino`.
- `pino`;
- `bullmq`;

## Plugins

Expand Down Expand Up @@ -132,6 +134,33 @@ Add the plugin to your Fastify instance by registering it with the following opt

The plugin exposes a `GET /metrics` route in your Fastify app to retrieve Prometheus metrics. If something goes wrong while starting the Prometheus metrics server, an `Error` is thrown. Otherwise, a success message is displayed when the plugin has been loaded.

### BullMQ Metrics Plugin

Plugin to auto-discover BullMQ queues which can regularly collect metrics for them and expose via `fastify-metrics` global Prometheus registry. If used together with `metricsPlugin`, it will show these metrics on `GET /metrics` route.

This plugin depends on the following peer-installed packages:

- `bullmq`
- `ioredis`

Add the plugin to your Fastify instance by registering it with the following possible options:

- `redisClient`, a Redis client instance which is used by the BullMQ: plugin uses it to discover the queues;
- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis;
- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus;
- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface;
- `excludedQueues` (optional, default: `[]`). An array of queue names to exclude from metrics collection;
- `histogramBuckets` (optional, default: `[20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000]`). Buckets for the histogram metrics (such as job completion or overall processing time).
- `collectionOptions` (optional, default: `{ type: 'interval', intervalInMs: 5000 }`). Allows to configure how metrics are collected. Supports the following properties:
- `type`. Can be either `interval` or `manual`.
- With `interval` type, plugin automatically loops and updates metrics at the specified interval.
- With `manual` type, you need to call `app.bullMqMetrics.collect()` to update the metrics; that allows you to build your own logic for scheduling the updates.
- `intervalInMs` (only for `type: 'interval'`). The interval in milliseconds at which the metrics are collected;

This plugin exposes `bullMqMetrics.collect()` method on the Fastify instance to manually trigger the metrics collection.

If something goes wrong while starting the BullMQ metrics plugin, an `Error` is thrown.

### NewRelic Transaction Manager Plugin

Plugin to create custom NewRelic spans for background jobs.
Expand Down
8 changes: 8 additions & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ export type {
HealthcheckMetricsPluginOptions,
} from './plugins/healthcheck/healthcheckMetricsPlugin'

export { bullMqMetricsPlugin } from './plugins/bullMqMetricsPlugin'
export type { BullMqMetricsPluginOptions } from './plugins/bullMqMetricsPlugin'
export {
RedisBasedQueueDiscoverer,
BackgroundJobsBasedQueueDiscoverer,
} from './plugins/bull-mq-metrics/queueDiscoverers'
export type { QueueDiscoverer } from './plugins/bull-mq-metrics/queueDiscoverers'

export { metricsPlugin } from './plugins/metricsPlugin'
export type { ErrorObjectResolver, MetricsPluginOptions } from './plugins/metricsPlugin'

Expand Down
28 changes: 28 additions & 0 deletions lib/plugins/bull-mq-metrics/CollectionScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { setTimeout } from 'node:timers/promises'

export type CollectionScheduler = {
start: () => void
stop: () => void
}

export class PromiseBasedCollectionScheduler implements CollectionScheduler {
private active = true
private readonly collectionIntervalInMs: number
private readonly collect: () => Promise<void>

constructor(collectionIntervalInMs: number, collect: () => Promise<void>) {
this.collectionIntervalInMs = collectionIntervalInMs
this.collect = collect
}

async start(): Promise<void> {
while (this.active) {
await this.collect()
await setTimeout(this.collectionIntervalInMs)
}
}

stop(): void {
this.active = false
}
}
133 changes: 133 additions & 0 deletions lib/plugins/bull-mq-metrics/MetricsCollector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { PromisePool } from '@supercharge/promise-pool'
import type { FastifyBaseLogger } from 'fastify'
import type { Redis } from 'ioredis'
import * as prometheus from 'prom-client'

import { ObservableQueue } from './ObservableQueue'
import type { QueueDiscoverer } from './queueDiscoverers'

export type Metrics = {
completedGauge: prometheus.Gauge<never>
activeGauge: prometheus.Gauge<never>
delayedGauge: prometheus.Gauge<never>
failedGauge: prometheus.Gauge<never>
waitingGauge: prometheus.Gauge<never>
completedDuration: prometheus.Histogram<never>
processedDuration: prometheus.Histogram<never>
}

export type MetricCollectorOptions = {
redisClient: Redis
bullMqPrefix: string
metricsPrefix: string
queueDiscoverer: QueueDiscoverer
excludedQueues: string[]
histogramBuckets: number[]
}

const getMetrics = (prefix: string, histogramBuckets: number[]): Metrics => ({
completedGauge: new prometheus.Gauge({
name: `${prefix}_jobs_completed`,
help: 'Total number of completed jobs',
labelNames: ['queue'],
}),
activeGauge: new prometheus.Gauge({
name: `${prefix}_jobs_active`,
help: 'Total number of active jobs (currently being processed)',
labelNames: ['queue'],
}),
failedGauge: new prometheus.Gauge({
name: `${prefix}_jobs_failed`,
help: 'Total number of failed jobs',
labelNames: ['queue'],
}),
delayedGauge: new prometheus.Gauge({
name: `${prefix}_jobs_delayed`,
help: 'Total number of jobs that will run in the future',
labelNames: ['queue'],
}),
waitingGauge: new prometheus.Gauge({
name: `${prefix}_jobs_waiting`,
help: 'Total number of jobs waiting to be processed',
labelNames: ['queue'],
}),
processedDuration: new prometheus.Histogram({
name: `${prefix}_jobs_processed_duration`,
help: 'Processing time for completed jobs (processing until completed)',
buckets: histogramBuckets,
labelNames: ['queue'],
}),
completedDuration: new prometheus.Histogram({
name: `${prefix}_jobs_completed_duration`,
help: 'Completion time for jobs (created until completed)',
buckets: histogramBuckets,
labelNames: ['queue'],
}),
})

export class MetricsCollector {
private readonly redis: Redis
private readonly metrics: Metrics
private observedQueues: ObservableQueue[] | undefined

constructor(
private readonly options: MetricCollectorOptions,
private readonly registry: prometheus.Registry,
private readonly logger: FastifyBaseLogger,
) {
this.redis = options.redisClient
this.metrics = this.registerMetrics(this.registry, this.options)
}

/**
* Updates metrics for all discovered queues
*/
async collect() {
if (!this.observedQueues) {
this.observedQueues = (await this.options.queueDiscoverer.discoverQueues())
.filter((name) => !this.options.excludedQueues.includes(name))
.map((name) => new ObservableQueue(name, this.redis, this.metrics, this.logger))
}

await PromisePool.for(this.observedQueues).process((queue) => {
void queue.collect()
})
}

/**
* Stops the metrics collection and cleans up resources
*/
async dispose() {
for (const queue of this.observedQueues ?? []) {
await queue.dispose()
}
}

private registerMetrics(
registry: prometheus.Registry,
{ metricsPrefix, histogramBuckets }: MetricCollectorOptions,
): Metrics {
const metrics = getMetrics(metricsPrefix, histogramBuckets)
const metricNames = Object.keys(metrics)

// If metrics are already registered, just return them to avoid triggering a Prometheus error
if (metricNames.length > 0 && registry.getSingleMetric(metricNames[0])) {
const retrievedMetrics = registry.getMetricsAsArray()
const returnValue: Record<string, prometheus.MetricObject> = {}

for (const metric of retrievedMetrics) {
if (metricNames.includes(metric.name)) {
returnValue[metric.name as keyof Metrics] = metric
}
}

return returnValue as unknown as Metrics
}

for (const metric of Object.values(metrics)) {
registry.registerMetric(metric)
}

return metrics
}
}
60 changes: 60 additions & 0 deletions lib/plugins/bull-mq-metrics/ObservableQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Queue, QueueEvents } from 'bullmq'
import type { FastifyBaseLogger } from 'fastify'
import type { Redis } from 'ioredis'

import type { Metrics } from './MetricsCollector'

export class ObservableQueue {
private readonly queue: Queue
private readonly events: QueueEvents

constructor(
readonly name: string,
private readonly redis: Redis,
private readonly metrics: Metrics,
private readonly logger: FastifyBaseLogger,
) {
this.queue = new Queue(name, { connection: redis })
this.events = new QueueEvents(name, { connection: redis })

this.events.on('completed', (completedJob: { jobId: string }) => {
this.queue
.getJob(completedJob.jobId)
.then((job) => {
if (!job) {
return
}

if (job.finishedOn) {
metrics.completedDuration
.labels({ queue: name })
.observe(job.finishedOn - job.timestamp)

if (job.processedOn) {
metrics.processedDuration
.labels({ queue: name })
.observe(job.finishedOn - job.processedOn)
}
}
})
.catch((err) => {
this.logger.warn(err)
})
})
}

async collect() {
const { completed, active, delayed, failed, waiting } = await this.queue.getJobCounts()

this.metrics.activeGauge.labels({ queue: this.name }).set(active)
this.metrics.completedGauge.labels({ queue: this.name }).set(completed)
this.metrics.delayedGauge.labels({ queue: this.name }).set(delayed)
this.metrics.failedGauge.labels({ queue: this.name }).set(failed)
this.metrics.waitingGauge.labels({ queue: this.name }).set(waiting)
}

async dispose() {
await this.events.close()
await this.queue.close()
}
}
39 changes: 39 additions & 0 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { AbstractBackgroundJobProcessor } from '@lokalise/background-jobs-common'
import type { Redis } from 'ioredis'

export type QueueDiscoverer = {
discoverQueues: () => Promise<string[]>
}

export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(
private readonly redis: Redis,
private readonly queuesPrefix: string,
) {}

async discoverQueues(): Promise<string[]> {
const scanStream = this.redis.scanStream({
match: `${this.queuesPrefix}:*:meta`,
})

const queues = new Set<string>()
for await (const chunk of scanStream) {
// ESLint doesn't want a `;` here but Prettier does
// eslint-disable-next-line no-extra-semi
drdaemos marked this conversation as resolved.
Show resolved Hide resolved
;(chunk as string[])
.map((key) => key.split(':')[1])
.filter((value) => !!value)
.forEach((queue) => queues.add(queue))
}

return Array.from(queues).sort()
}
}

export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer {
constructor(private readonly redis: Redis) {}

async discoverQueues(): Promise<string[]> {
return await AbstractBackgroundJobProcessor.getActiveQueueIds(this.redis)
}
}
Loading
Loading