diff --git a/src/core/server/services/comments/pipeline/phases/wordList/service.ts b/src/core/server/services/comments/pipeline/phases/wordList/service.ts index f8c45003a4..c9832da4b5 100644 --- a/src/core/server/services/comments/pipeline/phases/wordList/service.ts +++ b/src/core/server/services/comments/pipeline/phases/wordList/service.ts @@ -24,25 +24,18 @@ import { const WORKER_SCRIPT = "./dist/core/server/services/comments/pipeline/phases/wordList/worker.js"; -interface PromiseCallbacks { - resolve: (result: R) => void; - reject: (err: Error) => void; -} - export class WordListService { private worker: Worker; private onMessageDelegate: (event: MessageEvent) => void; - private readonly callbacks: Map< - string, - PromiseCallbacks - > = new Map(); + private results: Map; private logger: Logger; private sanitizer: Sanitize; - constructor(logger: Logger) { + constructor(logger: Logger, numWorkers = 3) { this.logger = logger; + this.results = new Map(); this.onMessageDelegate = this.onMessage.bind(this); this.worker = new Worker(WORKER_SCRIPT); @@ -59,37 +52,20 @@ export class WordListService { }); } + private sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, ms); + }); + } + private onMessage(result: WordListWorkerResult) { if (!result) { return; } - // Get the callbacks for this result. - const callbacks = this.callbacks.get(result.id); - if (!callbacks) { - throw new Error(`Invalid result id: ${result.id}`); - } - - // Delete the callbacks for this result. - this.callbacks.delete(result.id); - - // Resolve the promise. - if (result.ok) { - callbacks.resolve(result); - } else { - callbacks.reject(result.err!); - } - } - - private send(message: WordListWorkerMessage) { - // Create a new promise to wait for the worker to finish. - const promise = new Promise((resolve, reject) => { - this.callbacks.set(message.id, { resolve, reject }); - }); - - this.worker.postMessage(message); - - return promise; + this.results.set(result.id, result); } public async initialize( @@ -111,7 +87,31 @@ export class WordListService { data, }; - const result = await this.send(message); + const builder = async () => { + let hasResult = this.results.has(message.id); + while (!hasResult) { + await this.sleep(1); + hasResult = this.results.has(message.id); + } + + const result = this.results.get(message.id); + if (!result) { + this.results.delete(message.id); + return { + id: message.id, + tenantID, + ok: false, + err: new Error("result was undefined"), + }; + } + + this.results.delete(message.id); + return result; + }; + + this.worker.postMessage(message); + const result = await builder(); + if (!result.ok || result.err) { this.logger.error( { tenantID: result.tenantID, id: result.id }, @@ -143,8 +143,32 @@ export class WordListService { data, }; - // Send the message to the worker. - const result = await this.send(message); + this.worker.postMessage(message); + + const builder = async () => { + let hasResult = this.results.has(message.id); + while (!hasResult) { + await this.sleep(1); + hasResult = this.results.has(message.id); + } + + const result = this.results.get(message.id); + if (!result) { + this.results.delete(message.id); + return { + id: message.id, + tenantID, + ok: false, + err: new Error("result was undefined"), + }; + } + + this.results.delete(message.id); + return result; + }; + + const result = await builder(); + if (!result.ok || result.err) { return { isMatched: false,