From 99b1b6a337fe8a7439531595ad7b34e89274a485 Mon Sep 17 00:00:00 2001 From: Eugene Dementjev Date: Wed, 17 Jul 2024 15:13:39 +0400 Subject: [PATCH] AIM-402 Extract queue discovery logic, add background-jobs-common discoverer, check for registered metrics before attempting to register them. --- README.md | 24 +++- lib/index.ts | 5 + .../bull-mq-metrics/MetricsCollector.ts | 119 +++++++++--------- .../bull-mq-metrics/queueDiscoverers.ts | 34 +++++ lib/plugins/bullMqMetricsPlugin.ts | 2 + package.json | 1 + 6 files changed, 125 insertions(+), 60 deletions(-) create mode 100644 lib/plugins/bull-mq-metrics/queueDiscoverers.ts diff --git a/README.md b/README.md index 6a386b1..41e4e77 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,8 @@ The following needs to be taken into consideration when adding new runtime depen - `@fastify/jwt`; - `fastify`; - `newrelic`; -- `pino`. +- `pino`; +- `bullmq`; ## Plugins @@ -132,6 +133,27 @@ Add the plugin to your Fastify instance by registering it with the following opt The plugin exposes a `GET /metrics` route in your Fastify app to retrieve Prometheus metrics. If something goes wrong while starting the Prometheus metrics server, an `Error` is thrown. Otherwise, a success message is displayed when the plugin has been loaded. +### BullMQ Netrics Plugin + +Plugin to auto-discover BullMQ queues which regularly collects metrics for them and exposes them via `fastify-metrics` global Prometheus registry. If used together with `metricsPlugin`, it will show these metrics on `GET /metrics` route. + +This plugin depends on the following peer-installed packages: + +- `bullmq` +- `ioredis` + +Add the plugin to your Fastify instance by registering it with the following options: + +- `redisClient`, a Redis client instance which is used by the BullMQ: plugin uses it to discover the queues; +- `bullMqPrefix` (optional, default: `bull`). The prefix used by BullMQ to store the queues in Redis; +- `metricsPrefix` (optional, default: `bullmq`). The prefix for the metrics in Prometheus; +- `queueDiscoverer` (optional, default: `BackgroundJobsBasedQueueDiscoverer`). The queue discoverer to use. The default one relies on the logic implemented by `@lokalise/background-jobs-common` where queue names are registered by the background job processors; If you are not using `@lokalise/background-jobs-common`, you can use your own queue discoverer by instantiating a `RedisBasedQueueDiscoverer` or implementing a `QueueDiscoverer` interface; +- `excludedQueues` (optional, default: `[]`). An array of queue names to exclude from metrics collection; +- `collectionIntervalInMs` (optional, default: `5000`). The interval in milliseconds at which the metrics are collected; +- `histogramBuckets` (optional, default: `[20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000]`). Buckets for the histogram metrics (such as job completion or overall processing time). + +If something goes wrong while starting the BullMQ metrics plugin, an `Error` is thrown. + ### NewRelic Transaction Manager Plugin Plugin to create custom NewRelic spans for background jobs. diff --git a/lib/index.ts b/lib/index.ts index feedf70..f81b8a9 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -34,6 +34,11 @@ export type { HealthcheckMetricsPluginOptions, } from './plugins/healthcheck/healthcheckMetricsPlugin' +export { bullMqMetricsPlugin } from './plugins/bullMqMetricsPlugin' +export type { BullMqMetricsPluginOptions } from './plugins/bullMqMetricsPlugin' +export { RedisBasedQueueDiscoverer, BackgroundJobsBasedQueueDiscoverer } from './plugins/bull-mq-metrics/queueDiscoverers' +export type { QueueDiscoverer } from './plugins/bull-mq-metrics/queueDiscoverers' + export { metricsPlugin } from './plugins/metricsPlugin' export type { ErrorObjectResolver, MetricsPluginOptions } from './plugins/metricsPlugin' diff --git a/lib/plugins/bull-mq-metrics/MetricsCollector.ts b/lib/plugins/bull-mq-metrics/MetricsCollector.ts index 04fc235..fb4ba31 100644 --- a/lib/plugins/bull-mq-metrics/MetricsCollector.ts +++ b/lib/plugins/bull-mq-metrics/MetricsCollector.ts @@ -1,8 +1,11 @@ +import { setTimeout } from 'node:timers/promises' + import { Queue, QueueEvents } from 'bullmq' import type { Redis } from 'ioredis' import * as prometheus from 'prom-client' import type { BullMqMetricsPluginOptions } from '../bullMqMetricsPlugin' +import { BackgroundJobsBasedQueueDiscoverer } from './queueDiscoverers' type Metrics = { completedGauge: prometheus.Gauge @@ -16,6 +19,46 @@ type Metrics = { type MetricCollectorOptions = Required> +const getMetrics = (prefix: string, histogramBuckets: number[]): Metrics => ({ + completedGauge: new prometheus.Gauge({ + name: `${prefix}_jobs_completed`, + help: 'Total number of completed jobs', + labelNames: ['queue'], + }), + activeGauge: new prometheus.Gauge({ + name: `${prefix}_jobs_active`, + help: 'Total number of active jobs (currently being processed)', + labelNames: ['queue'], + }), + failedGauge: new prometheus.Gauge({ + name: `${prefix}_jobs_failed`, + help: 'Total number of failed jobs', + labelNames: ['queue'], + }), + delayedGauge: new prometheus.Gauge({ + name: `${prefix}_jobs_delayed`, + help: 'Total number of jobs that will run in the future', + labelNames: ['queue'], + }), + waitingGauge: new prometheus.Gauge({ + name: `${prefix}_jobs_waiting`, + help: 'Total number of jobs waiting to be processed', + labelNames: ['queue'], + }), + processedDuration: new prometheus.Histogram({ + name: `${prefix}_jobs_processed_duration`, + help: 'Processing time for completed jobs (processing until completed)', + buckets: histogramBuckets, + labelNames: ['queue'], + }), + completedDuration: new prometheus.Histogram({ + name: `${prefix}_jobs_completed_duration`, + help: 'Completion time for jobs (created until completed)', + buckets: histogramBuckets, + labelNames: ['queue'], + }), +}) + export class MetricsCollector { private readonly options: MetricCollectorOptions private readonly metrics: Metrics @@ -32,6 +75,7 @@ export class MetricsCollector { excludedQueues: [], collectionIntervalInMs: 5000, histogramBuckets: [20, 50, 150, 400, 1000, 3000, 8000, 22000, 60000, 150000], + queueDiscoverer: new BackgroundJobsBasedQueueDiscoverer(this.redis), ...options, } @@ -39,7 +83,7 @@ export class MetricsCollector { } async start() { - const queueNames = await this.findQueues(this.redis, this.options) + const queueNames = await this.options.queueDiscoverer.discoverQueues() await Promise.all( queueNames.map((name) => this.observeQueue(name, this.redis, this.metrics, this.options)), ) @@ -53,44 +97,20 @@ export class MetricsCollector { registry: prometheus.Registry, { metricsPrefix, histogramBuckets }: MetricCollectorOptions, ): Metrics { - const metrics: Metrics = { - completedGauge: new prometheus.Gauge({ - name: `${metricsPrefix}_jobs_completed`, - help: 'Total number of completed jobs', - labelNames: ['queue'], - }), - activeGauge: new prometheus.Gauge({ - name: `${metricsPrefix}_jobs_active`, - help: 'Total number of active jobs (currently being processed)', - labelNames: ['queue'], - }), - failedGauge: new prometheus.Gauge({ - name: `${metricsPrefix}_jobs_failed`, - help: 'Total number of failed jobs', - labelNames: ['queue'], - }), - delayedGauge: new prometheus.Gauge({ - name: `${metricsPrefix}_jobs_delayed`, - help: 'Total number of jobs that will run in the future', - labelNames: ['queue'], - }), - waitingGauge: new prometheus.Gauge({ - name: `${metricsPrefix}_jobs_waiting`, - help: 'Total number of jobs waiting to be processed', - labelNames: ['queue'], - }), - processedDuration: new prometheus.Histogram({ - name: `${metricsPrefix}_jobs_processed_duration`, - help: 'Processing time for completed jobs (processing until completed)', - buckets: histogramBuckets, - labelNames: ['queue'], - }), - completedDuration: new prometheus.Histogram({ - name: `${metricsPrefix}_jobs_completed_duration`, - help: 'Completion time for jobs (created until completed)', - buckets: histogramBuckets, - labelNames: ['queue'], - }), + const metrics = getMetrics(metricsPrefix, histogramBuckets) + + // If metrics are already registered, just return them to avoid triggering a Prometheus error + if (registry.getSingleMetric(Object.keys(metrics).pop()!)) { + const retrievedMetrics = registry.getMetricsAsArray() + const returnValue: Record = {} + + for (const metric of retrievedMetrics) { + if (Object.keys(metrics).includes(metric.name)) { + returnValue[metric.name as keyof Metrics] = metric + } + } + + return returnValue as unknown as Metrics } for (const metric of Object.values(metrics)) { @@ -100,25 +120,6 @@ export class MetricsCollector { return metrics } - private async findQueues( - redis: Redis, - { bullMqPrefix }: MetricCollectorOptions, - ): Promise { - const scanStream = redis.scanStream({ - match: `${bullMqPrefix}:*:meta`, - }) - - const queues = new Set() - for await (const chunk of scanStream) { - ;(chunk as string[]) - .map((key) => key.split(':')[1]) - .filter((value) => !!value) - .forEach((queue) => queues.add(queue)) - } - - return Array.from(queues) - } - private async observeQueue( name: string, redis: Redis, @@ -155,7 +156,7 @@ export class MetricsCollector { metrics.failedGauge.labels({ queue: name }).set(failed) metrics.waitingGauge.labels({ queue: name }).set(waiting) - await new Promise((resolve) => setTimeout(resolve, collectionIntervalInMs)) + await setTimeout(collectionIntervalInMs) } await events.close() diff --git a/lib/plugins/bull-mq-metrics/queueDiscoverers.ts b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts new file mode 100644 index 0000000..8c5fb36 --- /dev/null +++ b/lib/plugins/bull-mq-metrics/queueDiscoverers.ts @@ -0,0 +1,34 @@ +import { Redis } from 'ioredis' +import { AbstractBackgroundJobProcessor } from '@lokalise/background-jobs-common' + +export type QueueDiscoverer = { + discoverQueues: () => Promise +} + +export class RedisBasedQueueDiscoverer implements QueueDiscoverer { + constructor(private readonly redis: Redis, private readonly queuesPrefix: string) {} + + async discoverQueues(): Promise { + const scanStream = this.redis.scanStream({ + match: `${this.queuesPrefix}:*:meta`, + }) + + const queues = new Set() + for await (const chunk of scanStream) { + (chunk as string[]) + .map((key) => key.split(':')[1]) + .filter((value) => !!value) + .forEach((queue) => queues.add(queue)) + } + + return Array.from(queues).sort() + } +} + +export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer { + constructor(private readonly redis: Redis) {} + + async discoverQueues(): Promise { + return await AbstractBackgroundJobProcessor.getActiveQueueIds(this.redis) + } +} \ No newline at end of file diff --git a/lib/plugins/bullMqMetricsPlugin.ts b/lib/plugins/bullMqMetricsPlugin.ts index 2ab0d9b..283a457 100644 --- a/lib/plugins/bullMqMetricsPlugin.ts +++ b/lib/plugins/bullMqMetricsPlugin.ts @@ -4,11 +4,13 @@ import fp from 'fastify-plugin' import type { Redis } from 'ioredis' import { MetricsCollector } from './bull-mq-metrics/MetricsCollector' +import { QueueDiscoverer } from './bull-mq-metrics/queueDiscoverers' export type BullMqMetricsPluginOptions = { redisClient: Redis bullMqPrefix?: string metricsPrefix?: string + queueDiscoverer?: QueueDiscoverer excludedQueues?: string[] collectionIntervalInMs?: number histogramBuckets?: number[] diff --git a/package.json b/package.json index ea2804f..8b0f6aa 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "dependencies": { "@bugsnag/js": "^7.22.7", "@lokalise/error-utils": "^1.4.0", + "@lokalise/background-jobs-common": "^4.0.1", "@splitsoftware/splitio": "^10.25.2", "@amplitude/analytics-node": "^1.3.5", "fastify-metrics": "^11.0.0",