Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Omnichannel queue starting multiple times due to race condition #34062

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
5 changes: 5 additions & 0 deletions .changeset/green-shirts-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/meteor": patch
---

Fixes condition causing Omnichannel queue to start more than once.
43 changes: 40 additions & 3 deletions apps/meteor/server/services/omnichannel/queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ServiceStarter } from '@rocket.chat/core-services';
import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings';
import { License } from '@rocket.chat/license';
import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models';
Expand All @@ -11,6 +12,17 @@ import { settings } from '../../../app/settings/server';
const DEFAULT_RACE_TIMEOUT = 5000;

export class OmnichannelQueue implements IOmnichannelQueue {
private serviceStarter: ServiceStarter;

private timeoutHandler: ReturnType<typeof setTimeout> | null = null;

constructor() {
this.serviceStarter = new ServiceStarter(
() => this._start(),
() => this._stop(),
);
}

private running = false;

private queues: (string | undefined)[] = [];
Expand All @@ -24,7 +36,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return this.running;
}

async start() {
private async _start() {
if (this.running) {
return;
}
Expand All @@ -37,17 +49,31 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return this.execute();
}

async stop() {
private async _stop() {
if (!this.running) {
return;
}

await LivechatInquiry.unlockAll();

this.running = false;

if (this.timeoutHandler !== null) {
clearTimeout(this.timeoutHandler);
this.timeoutHandler = null;
}

queueLogger.info('Service stopped');
}

async start() {
return this.serviceStarter.start();
}

async stop() {
return this.serviceStarter.stop();
}

private async getActiveQueues() {
// undefined = public queue(without department)
return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({}));
Expand Down Expand Up @@ -118,10 +144,21 @@ export class OmnichannelQueue implements IOmnichannelQueue {
err: e,
});
} finally {
setTimeout(this.execute.bind(this), this.delay());
this.scheduleExecution();
}
}

private scheduleExecution(): void {
if (this.timeoutHandler !== null) {
return;
}

this.timeoutHandler = setTimeout(() => {
this.timeoutHandler = null;
return this.execute();
}, this.delay());
}

async shouldStart() {
if (!settings.get('Livechat_enabled')) {
void this.stop();
Expand Down
6 changes: 1 addition & 5 deletions apps/meteor/server/services/omnichannel/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha
}

async started() {
settings.watch<boolean>('Livechat_enabled', (enabled) => {
void (enabled && RoutingManager.isMethodSet() ? this.queueWorker.shouldStart() : this.queueWorker.stop());
});

settings.watch<string>('Livechat_Routing_Method', async () => {
settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => {
this.queueWorker.shouldStart();
});

Expand Down
1 change: 1 addition & 0 deletions packages/core-services/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export {
} from './types/IOmnichannelAnalyticsService';

export { getConnection, getTrashCollection } from './lib/mongo';
export { ServiceStarter } from './lib/ServiceStarter';

export {
AutoUpdateRecord,
Expand Down
68 changes: 68 additions & 0 deletions packages/core-services/src/lib/ServiceStarter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// This class is used to manage calls to a service's .start and .stop functions
// Specifically for cases where the start function has different conditions that may cause the service to actually start or not,
// or when the start process can take a while to complete
// Using this class, you ensure that calls to .start and .stop will be chained, so you avoid race conditions
// At the same time, it prevents those functions from running more times than necessary if there are several calls to them (for example when loading setting values)
export class ServiceStarter {
private lock = Promise.resolve();

private currentCall?: 'start' | 'stop';

private nextCall?: 'start' | 'stop';

private starterFn: () => Promise<void>;

private stopperFn?: () => Promise<void>;

constructor(starterFn: () => Promise<void>, stopperFn?: () => Promise<void>) {
this.starterFn = starterFn;
this.stopperFn = stopperFn;
}

private async checkStatus(): Promise<void> {
if (this.nextCall === 'start') {
return this.doCall('start');
}

if (this.nextCall === 'stop') {
return this.doCall('stop');
}
}

private async doCall(call: 'start' | 'stop'): Promise<void> {
this.nextCall = undefined;
this.currentCall = call;
try {
if (call === 'start') {
await this.starterFn();
} else if (this.stopperFn) {
await this.stopperFn();
}
} finally {
this.currentCall = undefined;
await this.checkStatus();
}
}

private async call(call: 'start' | 'stop'): Promise<void> {
// If something is already chained to run after the current call, it's okay to replace it with the new call
this.nextCall = call;
if (this.currentCall) {
return this.lock;
}
this.lock = this.lock.then(() => this.checkStatus());
return this.lock;
}

async start(): Promise<void> {
return this.call('start');
}

async stop(): Promise<void> {
return this.call('stop');
}

async wait(): Promise<void> {
return this.lock;
}
}
105 changes: 105 additions & 0 deletions packages/core-services/tests/ServiceStarter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { ServiceStarter } from '../src/lib/ServiceStarter';

const wait = (time: number) => {
return new Promise((resolve) => {
setTimeout(() => resolve(undefined), time);
});
};

describe('ServiceStarter', () => {
it('should call the starterFn and stopperFn when calling .start and .stop', async () => {
const start = jest.fn();
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

expect(start).not.toHaveBeenCalled();
expect(stop).not.toHaveBeenCalled();

await instance.start();

expect(start).toHaveBeenCalled();
expect(stop).not.toHaveBeenCalled();

start.mockReset();

await instance.stop();

expect(start).not.toHaveBeenCalled();
expect(stop).toHaveBeenCalled();
});

it('should only call .start for the second time after the initial call has finished running', async () => {
let running = false;
const start = jest.fn(async () => {
expect(running).toBe(false);

running = true;
await wait(100);
running = false;
});
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

void instance.start();
setImmediate(() => {
void instance.start();
});

await instance.wait();

expect(start).toHaveBeenCalledTimes(2);
expect(stop).not.toHaveBeenCalled();
});

it('should chain up to two calls to .start', async () => {
const start = jest.fn(async () => {
await wait(100);
});
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

void instance.start();
setImmediate(() => {
void instance.start();
setImmediate(() => {
void instance.start();
setImmediate(() => {
void instance.start();
});
});
});

await instance.wait();

expect(start).toHaveBeenCalledTimes(2);
expect(stop).not.toHaveBeenCalled();
});

it('should skip the chained calls to .start if .stop is called', async () => {
const start = jest.fn(async () => {
await wait(100);
});
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

void instance.start();
setImmediate(() => {
void instance.start();
setImmediate(() => {
void instance.start();
setImmediate(() => {
void instance.stop();
});
});
});

await instance.wait();

expect(start).toHaveBeenCalledTimes(1);
expect(stop).toHaveBeenCalledTimes(1);
});
});
Loading