Skip to content

Commit

Permalink
Merge pull request #4326 from coralproject/fix/revert-wordlist-worker…
Browse files Browse the repository at this point in the history
…-callbacks

Revert "Merge pull request #4312 from coralproject/wyattjoh/promise-worker-threads
  • Loading branch information
tessalt authored Aug 24, 2023
2 parents 88ddf0a + 0bf9bf2 commit 18414e9
Showing 1 changed file with 63 additions and 39 deletions.
102 changes: 63 additions & 39 deletions src/core/server/services/comments/pipeline/phases/wordList/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,18 @@ import {
const WORKER_SCRIPT =
"./dist/core/server/services/comments/pipeline/phases/wordList/worker.js";

interface PromiseCallbacks<R> {
resolve: (result: R) => void;
reject: (err: Error) => void;
}

export class WordListService {
private worker: Worker;

private onMessageDelegate: (event: MessageEvent) => void;
private readonly callbacks: Map<
string,
PromiseCallbacks<WordListWorkerResult>
> = new Map();
private results: Map<string, WordListWorkerResult>;
private logger: Logger;
private sanitizer: Sanitize;

constructor(logger: Logger) {
constructor(logger: Logger, numWorkers = 3) {
this.logger = logger;

this.results = new Map<string, WordListWorkerResult>();
this.onMessageDelegate = this.onMessage.bind(this);

this.worker = new Worker(WORKER_SCRIPT);
Expand All @@ -59,37 +52,20 @@ export class WordListService {
});
}

private sleep(ms: number) {
return new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, ms);
});
}

private onMessage(result: WordListWorkerResult) {
if (!result) {
return;
}

// Get the callbacks for this result.
const callbacks = this.callbacks.get(result.id);
if (!callbacks) {
throw new Error(`Invalid result id: ${result.id}`);
}

// Delete the callbacks for this result.
this.callbacks.delete(result.id);

// Resolve the promise.
if (result.ok) {
callbacks.resolve(result);
} else {
callbacks.reject(result.err!);
}
}

private send(message: WordListWorkerMessage) {
// Create a new promise to wait for the worker to finish.
const promise = new Promise<WordListWorkerResult>((resolve, reject) => {
this.callbacks.set(message.id, { resolve, reject });
});

this.worker.postMessage(message);

return promise;
this.results.set(result.id, result);
}

public async initialize(
Expand All @@ -111,7 +87,31 @@ export class WordListService {
data,
};

const result = await this.send(message);
const builder = async () => {
let hasResult = this.results.has(message.id);
while (!hasResult) {
await this.sleep(1);
hasResult = this.results.has(message.id);
}

const result = this.results.get(message.id);
if (!result) {
this.results.delete(message.id);
return {
id: message.id,
tenantID,
ok: false,
err: new Error("result was undefined"),
};
}

this.results.delete(message.id);
return result;
};

this.worker.postMessage(message);
const result = await builder();

if (!result.ok || result.err) {
this.logger.error(
{ tenantID: result.tenantID, id: result.id },
Expand Down Expand Up @@ -143,8 +143,32 @@ export class WordListService {
data,
};

// Send the message to the worker.
const result = await this.send(message);
this.worker.postMessage(message);

const builder = async () => {
let hasResult = this.results.has(message.id);
while (!hasResult) {
await this.sleep(1);
hasResult = this.results.has(message.id);
}

const result = this.results.get(message.id);
if (!result) {
this.results.delete(message.id);
return {
id: message.id,
tenantID,
ok: false,
err: new Error("result was undefined"),
};
}

this.results.delete(message.id);
return result;
};

const result = await builder();

if (!result.ok || result.err) {
return {
isMatched: false,
Expand Down

0 comments on commit 18414e9

Please sign in to comment.