Skip to content

Commit

Permalink
AP-4397 use PromisePool for queues discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
kjamrog committed Aug 22, 2024
1 parent 86c734f commit a61fbbd
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions lib/plugins/bull-mq-metrics/queueDiscoverers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { backgroundJobProcessorGetActiveQueueIds } from '@lokalise/background-jobs-common'
import { PromisePool } from '@supercharge/promise-pool'
import type { Redis } from 'ioredis'

export type QueueDiscoverer = {
Expand All @@ -10,19 +11,36 @@ type RedisQueue = {
queueName: string
}

export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(
private readonly redisInstances: Redis[],
private readonly queuesPrefix: string,
) {}
const QUEUE_DISCOVERY_CONCURRENCY = 3

export abstract class AbstractRedisBasedQueueDiscoverer implements QueueDiscoverer {
constructor(protected readonly redisInstances: Redis[]) {}

async discoverQueues(): Promise<RedisQueue[]> {
return Promise.all(
this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)),
).then((queues) => queues.flat())
const { results, errors } = await PromisePool.withConcurrency(QUEUE_DISCOVERY_CONCURRENCY)
.for(this.redisInstances)
.process((redisInstance) => this.discoverQueuesForInstance(redisInstance))

if (errors.length > 0) {
// Throwing first error that was encountered
throw errors[0]
}

return results.flat()
}

protected abstract discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]>
}

export class RedisBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
constructor(
redisInstances: Redis[],
private readonly queuesPrefix: string,
) {
super(redisInstances)
}

private async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
const scanStream = redisInstance.scanStream({
match: `${this.queuesPrefix}:*:meta`,
})
Expand All @@ -45,16 +63,8 @@ export class RedisBasedQueueDiscoverer implements QueueDiscoverer {
}
}

export class BackgroundJobsBasedQueueDiscoverer implements QueueDiscoverer {
constructor(private readonly redisInstances: Redis[]) {}

async discoverQueues(): Promise<RedisQueue[]> {
return Promise.all(
this.redisInstances.map((redisInstance) => this.discoverQueuesForInstance(redisInstance)),
).then((queues) => queues.flat())
}

private async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
export class BackgroundJobsBasedQueueDiscoverer extends AbstractRedisBasedQueueDiscoverer {
protected async discoverQueuesForInstance(redisInstance: Redis): Promise<RedisQueue[]> {
return backgroundJobProcessorGetActiveQueueIds(redisInstance).then((queueNames) =>
queueNames.map((queueName) => ({
redisInstance,
Expand Down

0 comments on commit a61fbbd

Please sign in to comment.