diff --git a/packages/deployment/src/queue/BullQueue.ts b/packages/deployment/src/queue/BullQueue.ts index bd884670..7b8188aa 100644 --- a/packages/deployment/src/queue/BullQueue.ts +++ b/packages/deployment/src/queue/BullQueue.ts @@ -5,9 +5,11 @@ import { Closeable, InstantiatedQueue, TaskQueue, - SequencerModule, + AbstractTaskQueue, } from "@proto-kit/sequencer"; +import { InstantiatedBullQueue } from "./InstantiatedBullQueue"; + export interface BullQueueConfig { redis: { host: string; @@ -23,7 +25,7 @@ export interface BullQueueConfig { * TaskQueue implementation for BullMQ */ export class BullQueue - extends SequencerModule + extends AbstractTaskQueue implements TaskQueue { private activePromise?: Promise; @@ -40,6 +42,8 @@ export class BullQueue // This is by far not optimal - since it still picks up 1 task per queue but waits until // computing them, so that leads to bad performance over multiple workers. // For that we need to restructure tasks to be flowing through a single queue however + + // TODO Use worker.pause() while (this.activePromise !== undefined) { // eslint-disable-next-line no-await-in-loop await this.activePromise; @@ -80,50 +84,18 @@ export class BullQueue } public async getQueue(queueName: string): Promise { - const { retryAttempts, redis } = this.config; - - const queue = new Queue(queueName, { - connection: redis, - }); - const events = new QueueEvents(queueName, { connection: redis }); + return this.createOrGetQueue(queueName, (name) => { + log.debug(`Creating bull queue ${queueName}`); - await queue.drain(); + const { redis } = this.config; - return { - name: queueName, - - async addTask(payload: TaskPayload): Promise<{ taskId: string }> { - log.debug("Adding task: ", payload); - const job = await queue.add(queueName, payload, { - attempts: retryAttempts ?? 2, - }); - return { taskId: job.id! }; - }, + const queue = new Queue(queueName, { + connection: redis, + }); + const events = new QueueEvents(queueName, { connection: redis }); - async onCompleted(listener: (payload: TaskPayload) => Promise) { - events.on("completed", async (result) => { - log.debug("Completed task: ", result); - try { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - await listener(result.returnvalue as unknown as TaskPayload); - } catch (e) { - // Catch error explicitly since this promise is dangling, - // therefore any error will be voided as well - log.error(e); - } - }); - events.on("error", async (error) => { - log.error("Error in worker", error); - }); - await events.waitUntilReady(); - }, - - async close(): Promise { - await events.close(); - await queue.drain(); - await queue.close(); - }, - }; + return new InstantiatedBullQueue(name, queue, events, this.config); + }); } public async start() { diff --git a/packages/deployment/src/queue/InstantiatedBullQueue.ts b/packages/deployment/src/queue/InstantiatedBullQueue.ts new file mode 100644 index 00000000..65a31c73 --- /dev/null +++ b/packages/deployment/src/queue/InstantiatedBullQueue.ts @@ -0,0 +1,70 @@ +import { + InstantiatedQueue, + ListenerList, + TaskPayload, +} from "@proto-kit/sequencer"; +import { log } from "@proto-kit/common"; +import { Queue, QueueEvents } from "bullmq"; + +export class InstantiatedBullQueue implements InstantiatedQueue { + public constructor( + public readonly name: string, + private readonly queue: Queue, + private readonly events: QueueEvents, + private readonly options: { + retryAttempts?: number; + } + ) {} + + initialized = false; + + listeners = new ListenerList(); + + public async initialize() { + await this.queue.drain(); + } + + public async addTask(payload: TaskPayload): Promise<{ taskId: string }> { + log.debug("Adding task: ", payload); + const job = await this.queue.add(this.name, payload, { + attempts: this.options.retryAttempts ?? 2, + }); + return { taskId: job.id! }; + } + + async onCompleted(listener: (payload: TaskPayload) => Promise) { + if (!this.initialized) { + await this.events.waitUntilReady(); + + this.events.on("completed", async (result) => { + log.debug("Completed task: ", result); + try { + await this.listeners.executeListeners( + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + result.returnvalue as unknown as TaskPayload + ); + } catch (e) { + // Catch error explicitly since this promise is dangling, + // therefore any error will be voided as well + log.error(e); + } + }); + this.events.on("error", async (error) => { + log.error("Error in worker", error); + }); + this.initialized = true; + } + + return this.listeners.pushListener(listener); + } + + async offCompleted(listenerId: number) { + this.listeners.removeListener(listenerId); + } + + async close(): Promise { + await this.events.close(); + await this.queue.drain(); + await this.queue.close(); + } +} diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index 99d76db2..3e03702a 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -12,6 +12,8 @@ export * from "./worker/flow/JSONTaskSerializer"; // export * from "./worker/queue/BullQueue"; export * from "./worker/queue/TaskQueue"; export * from "./worker/queue/LocalTaskQueue"; +export * from "./worker/queue/ListenerList"; +export * from "./worker/queue/AbstractTaskQueue"; export * from "./worker/worker/FlowTaskWorker"; export * from "./worker/worker/LocalTaskWorkerModule"; export * from "./worker/worker/TaskWorkerModule"; diff --git a/packages/sequencer/src/worker/flow/Flow.ts b/packages/sequencer/src/worker/flow/Flow.ts index f299c0ef..985fc2df 100644 --- a/packages/sequencer/src/worker/flow/Flow.ts +++ b/packages/sequencer/src/worker/flow/Flow.ts @@ -1,5 +1,5 @@ -import { inject, injectable, Lifecycle, scoped } from "tsyringe"; -import { log } from "@proto-kit/common"; +import { inject, injectable } from "tsyringe"; +import { log, mapSequential } from "@proto-kit/common"; import { Closeable, InstantiatedQueue, TaskQueue } from "../queue/TaskQueue"; @@ -12,68 +12,6 @@ const errors = { ), }; -@injectable() -// ResolutionScoped => We want a new instance every time we resolve it -@scoped(Lifecycle.ResolutionScoped) -export class ConnectionHolder implements Closeable { - private queues: { - [key: string]: InstantiatedQueue; - } = {}; - - private listeners: { - [key: string]: { - [key: string]: (payload: TaskPayload) => Promise; - }; - } = {}; - - public constructor( - @inject("TaskQueue") private readonly queueImpl: TaskQueue - ) {} - - public registerListener( - flowId: string, - queue: string, - listener: (payload: TaskPayload) => Promise - ) { - if (this.listeners[queue] === undefined) { - this.listeners[queue] = {}; - } - this.listeners[queue][flowId] = listener; - } - - public unregisterListener(flowId: string, queue: string) { - delete this.listeners[queue][flowId]; - } - - private async openQueue(name: string): Promise { - const queue = await this.queueImpl.getQueue(name); - await queue.onCompleted(async (payload) => { - await this.onCompleted(name, payload); - }); - return queue; - } - - private async onCompleted(name: string, payload: TaskPayload) { - const listener = this.listeners[name]?.[payload.flowId]; - if (listener !== undefined) { - await listener(payload); - } - } - - public async getQueue(name: string) { - if (this.queues[name] !== undefined) { - return this.queues[name]; - } - const queue = await this.openQueue(name); - this.queues[name] = queue; - return queue; - } - - async close() { - // TODO - } -} - interface CompletedCallback { (result: Result, originalInput: Input): Promise; } @@ -83,7 +21,10 @@ export class Flow implements Closeable { // therefore cancelled private erroredOut = false; - private readonly registeredListeners: string[] = []; + private readonly registeredListeners: { + queueName: string; + listenerId: number; + }[] = []; private resultsPending: { [key: string]: (payload: TaskPayload) => Promise; @@ -98,28 +39,28 @@ export class Flow implements Closeable { public tasksInProgress = 0; public constructor( - private readonly connectionHolder: ConnectionHolder, + private readonly queueImpl: TaskQueue, public readonly flowId: string, public state: State ) {} - private waitForResult( - queue: string, + private async waitForResult( + queue: InstantiatedQueue, taskId: string, callback: (payload: TaskPayload) => Promise ) { this.resultsPending[taskId] = callback; - if (!this.registeredListeners.includes(queue)) { - // Open Listener onto Connectionhandler - this.connectionHolder.registerListener( - this.flowId, - queue, - async (payload) => { + if (!this.registeredListeners.find((l) => l.queueName === queue.name)) { + const listenerId = await queue.onCompleted(async (payload) => { + if (payload.flowId === this.flowId) { await this.resolveResponse(payload); } - ); - this.registeredListeners.push(queue); + }); + this.registeredListeners.push({ + queueName: queue.name, + listenerId, + }); } } @@ -167,7 +108,7 @@ export class Flow implements Closeable { ): Promise { const queueName = task.name; const taskName = overrides?.taskName ?? task.name; - const queue = await this.connectionHolder.getQueue(queueName); + const queue = await this.queueImpl.getQueue(queueName); const payload = await task.inputSerializer().toJSON(input); @@ -197,7 +138,7 @@ export class Flow implements Closeable { this.tasksInProgress -= 1; return await completed?.(decoded, input); }; - this.waitForResult(queueName, taskId, callback); + await this.waitForResult(queue, taskId, callback); } public async forEach( @@ -222,17 +163,23 @@ export class Flow implements Closeable { } public async close() { - this.registeredListeners.forEach((queue) => { - this.connectionHolder.unregisterListener(this.flowId, queue); - }); + await mapSequential( + this.registeredListeners, + async ({ queueName, listenerId }) => { + const queue = await this.queueImpl.getQueue(queueName); + queue.offCompleted(listenerId); + } + ); } } @injectable() export class FlowCreator { - public constructor(private readonly connectionHolder: ConnectionHolder) {} + public constructor( + @inject("TaskQueue") private readonly queueImpl: TaskQueue + ) {} public createFlow(flowId: string, state: State): Flow { - return new Flow(this.connectionHolder, flowId, state); + return new Flow(this.queueImpl, flowId, state); } } diff --git a/packages/sequencer/src/worker/queue/AbstractTaskQueue.ts b/packages/sequencer/src/worker/queue/AbstractTaskQueue.ts new file mode 100644 index 00000000..d2816998 --- /dev/null +++ b/packages/sequencer/src/worker/queue/AbstractTaskQueue.ts @@ -0,0 +1,19 @@ +import { SequencerModule } from "../../sequencer/builder/SequencerModule"; + +import type { InstantiatedQueue } from "./TaskQueue"; + +export abstract class AbstractTaskQueue< + Config, +> extends SequencerModule { + protected queues: Record = {}; + + protected createOrGetQueue( + name: string, + creator: (name: string) => InstantiatedQueue + ): InstantiatedQueue { + if (this.queues[name] === undefined) { + this.queues[name] = creator(name); + } + return this.queues[name]; + } +} diff --git a/packages/sequencer/src/worker/queue/ListenerList.ts b/packages/sequencer/src/worker/queue/ListenerList.ts new file mode 100644 index 00000000..3b497a7e --- /dev/null +++ b/packages/sequencer/src/worker/queue/ListenerList.ts @@ -0,0 +1,37 @@ +import { mapSequential } from "@proto-kit/common"; + +export class ListenerList { + private listenerId: number = 0; + + private listeners: { + listener: (payload: T) => Promise; + id: number; + }[] = []; + + public getListeners() { + return this.listeners.slice(); + } + + public async executeListeners(payload: T) { + await mapSequential( + this.getListeners(), + async (listener) => await listener.listener(payload) + ); + } + + public pushListener(listener: (payload: T) => Promise) { + // eslint-disable-next-line no-plusplus + const id = this.listenerId++; + + this.listeners.push({ + listener, + id, + }); + + return id; + } + + public removeListener(listenerId: number) { + this.listeners = this.listeners.filter(({ id }) => id !== listenerId); + } +} diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index d27bee5a..85e11db8 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -1,12 +1,11 @@ import { log, mapSequential, noop } from "@proto-kit/common"; -import { - sequencerModule, - SequencerModule, -} from "../../sequencer/builder/SequencerModule"; +import { sequencerModule } from "../../sequencer/builder/SequencerModule"; import { TaskPayload } from "../flow/Task"; import { Closeable, InstantiatedQueue, TaskQueue } from "./TaskQueue"; +import { ListenerList } from "./ListenerList"; +import { AbstractTaskQueue } from "./AbstractTaskQueue"; async function sleep(ms: number) { await new Promise((resolve) => { @@ -23,12 +22,59 @@ export interface LocalTaskQueueConfig { simulatedDuration?: number; } +class InMemoryInstantiatedQueue implements InstantiatedQueue { + public constructor( + public readonly name: string, + public taskQueue: LocalTaskQueue + ) {} + + private id = 0; + + private instantiated = false; + + private listeners = new ListenerList(); + + async addTask( + payload: TaskPayload, + taskId?: string + ): Promise<{ taskId: string }> { + this.id += 1; + const nextId = taskId ?? String(this.id).toString(); + this.taskQueue.queuedTasks[this.name].push({ payload, taskId: nextId }); + + void this.taskQueue.workNextTasks(); + + return { taskId: nextId }; + } + + async onCompleted( + listener: (payload: TaskPayload) => Promise + ): Promise { + if (!this.instantiated) { + (this.taskQueue.listeners[this.name] ??= []).push(async (result) => { + await this.listeners.executeListeners(result); + }); + + this.instantiated = false; + } + return this.listeners.pushListener(listener); + } + + async offCompleted(listenerId: number) { + this.listeners.removeListener(listenerId); + } + + async close() { + noop(); + } +} + @sequencerModule() export class LocalTaskQueue - extends SequencerModule + extends AbstractTaskQueue implements TaskQueue { - public queues: { + public queuedTasks: { [key: string]: { payload: TaskPayload; taskId: string }[]; } = {}; @@ -55,7 +101,7 @@ export class LocalTaskQueue this.taskInProgress = true; // Collect all tasks - const tasksToExecute = Object.entries(this.queues).flatMap( + const tasksToExecute = Object.entries(this.queuedTasks).flatMap( ([queueName, tasks]) => { if (tasks.length > 0 && this.workers[queueName]) { const functions = tasks.map((task) => async () => { @@ -77,7 +123,7 @@ export class LocalTaskQueue ); void Promise.all(listenerPromises || []); }); - this.queues[queueName] = []; + this.queuedTasks[queueName] = []; return functions; } @@ -137,36 +183,10 @@ export class LocalTaskQueue } public async getQueue(queueName: string): Promise { - this.queues[queueName] = []; - - let id = 0; - - return { - name: queueName, - - addTask: async ( - payload: TaskPayload, - taskId?: string - ): Promise<{ taskId: string }> => { - id += 1; - const nextId = taskId ?? String(id).toString(); - this.queues[queueName].push({ payload, taskId: nextId }); - - void this.workNextTasks(); - - return { taskId: nextId }; - }, - - onCompleted: async ( - listener: (payload: TaskPayload) => Promise - ): Promise => { - (this.listeners[queueName] ??= []).push(listener); - }, - - close: async () => { - noop(); - }, - }; + return this.createOrGetQueue(queueName, (name) => { + this.queuedTasks[name] = []; + return new InMemoryInstantiatedQueue(name, this); + }); } public async start(): Promise { diff --git a/packages/sequencer/src/worker/queue/TaskQueue.ts b/packages/sequencer/src/worker/queue/TaskQueue.ts index af2cf289..c3040b3a 100644 --- a/packages/sequencer/src/worker/queue/TaskQueue.ts +++ b/packages/sequencer/src/worker/queue/TaskQueue.ts @@ -37,5 +37,7 @@ export interface InstantiatedQueue extends Closeable { */ onCompleted: ( listener: (payload: TaskPayload) => Promise - ) => Promise; + ) => Promise; + + offCompleted: (listenerId: number) => void; }