Skip to content

Commit

Permalink
fix: address feedback and fix noticed issued on move contact
Browse files Browse the repository at this point in the history
  • Loading branch information
paulpascal committed Nov 20, 2024
1 parent 786acef commit f3457b7
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 31 deletions.
6 changes: 6 additions & 0 deletions src/config/config-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export const WorkerConfig = {
port: Number(environment.REDIS_PORT),
},
moveContactQueue: 'MOVE_CONTACT_QUEUE',
defaultJobOptions: {
attempts: 3, // Max retries for a failed job
backoff: {
type: 'custom',
},
}
};

const assertRedisConfig = () => {
Expand Down
9 changes: 5 additions & 4 deletions src/lib/queues.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { v4 } from 'uuid';
import { JobsOptions, Queue, ConnectionOptions } from 'bullmq';
import { JobsOptions, Queue, ConnectionOptions, DefaultJobOptions } from 'bullmq';
import { WorkerConfig } from '../config/config-worker';

export interface IQueue {
Expand All @@ -17,9 +17,9 @@ export class BullQueue implements IQueue {
public readonly name: string;
public readonly bullQueue: Queue;

constructor(queueName: string, connection: ConnectionOptions) {
constructor(queueName: string, connection: ConnectionOptions, defaultJobOptions?: DefaultJobOptions) {
this.name = queueName;
this.bullQueue = new Queue(queueName, { connection });
this.bullQueue = new Queue(queueName, { connection, defaultJobOptions });
}

public async add(jobParams: JobParams): Promise<string> {
Expand All @@ -37,5 +37,6 @@ export class BullQueue implements IQueue {

export const getMoveContactQueue = () => new BullQueue(
WorkerConfig.moveContactQueue,
WorkerConfig.redisConnection
WorkerConfig.redisConnection,
WorkerConfig.defaultJobOptions
);
7 changes: 6 additions & 1 deletion src/liquid/place/move_form.html
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ <h1 class="subtitle">To</h1>
<script type="text/javascript">
function handleResponse(event) {
const response = event?.detail?.xhr?.responseText;
if (response && response.includes('data-success="true"')) {
const parser = new DOMParser();
const doc = parser.parseFromString(response, 'text/html');

// Find the element that indicates success
const successElement = doc.querySelector('[data-success="true"]');
if (successElement) {
bulmaToast.toast({
duration: 5000,
dismissible: true,
Expand Down
77 changes: 53 additions & 24 deletions src/worker/move-contact-worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import axios from 'axios';
import { spawn } from 'child_process';
import { Worker, Job, DelayedError, ConnectionOptions } from 'bullmq';
import { Worker, Job, DelayedError, ConnectionOptions, MinimalJob } from 'bullmq';
import { DateTime } from 'luxon';

import Auth from '../lib/authentication';
Expand All @@ -25,7 +25,13 @@ export class MoveContactWorker {
this.worker = new Worker(
queueName,
this.handleJob,
{ connection, concurrency: this.MAX_CONCURRENCY }
{
connection,
concurrency: this.MAX_CONCURRENCY,
settings: {
backoffStrategy: this.handleRetryBackoff,
}
}
);
}

Expand All @@ -40,40 +46,56 @@ export class MoveContactWorker {
const jobData: MoveContactData = job.data;

// Ensure server availability
if (await this.shouldPostpone(jobData)) {
await this.postpone(job, processingToken);
const { shouldPostpone, reason } = await this.shouldPostpone(jobData);
if (shouldPostpone) {
await this.postpone(job, reason, processingToken);
throw new DelayedError();
}

const result = await this.moveContact(jobData);
const result = await this.moveContact(job);
if (!result.success) {
job.log(`[${new Date().toISOString()}]: ${result.message}`);
const errorMessage = `Job ${job.id} failed with the following error: ${result.message}`;
console.error(errorMessage);
this.jobLogWithTimestamp(job, errorMessage);
throw new Error(errorMessage);
}

console.log(`Job completed successfully: ${job.id}`);
return true;
};

private static async shouldPostpone(jobData: MoveContactData): Promise<boolean> {
private static handleRetryBackoff = (
attemptsMade: number, type: string | undefined, err: Error | undefined, job: MinimalJob | undefined
): number => {
const {retryTimeFormatted} = this.computeRetryTime();
const fullMessage = `Job ${job?.id} will be retried ${attemptsMade + 1} time at ${retryTimeFormatted}. Due to failure: ${type}: ${err?.message}`;
this.jobLogWithTimestamp(job, fullMessage);
return this.DELAY_IN_MILLIS;
};

private static async shouldPostpone(jobData: MoveContactData): Promise<{ shouldPostpone: boolean; reason: string }> {
try {
const { instanceUrl } = jobData;
const response = await axios.get(`${instanceUrl}/api/v2/monitoring`);
const sentinelBacklog = response.data.sentinel?.backlog;
console.log(`Sentinel backlog at ${sentinelBacklog} of ${this.MAX_SENTINEL_BACKLOG}`);
return sentinelBacklog > this.MAX_SENTINEL_BACKLOG;
return { shouldPostpone: sentinelBacklog > this.MAX_SENTINEL_BACKLOG, reason: `Sentinel backlog too high at ${sentinelBacklog}` };
} catch (err: any) {
const errorMessage = err.response?.data?.error?.message || err.response?.error || err?.message;
console.error('Error fetching monitoring data:', errorMessage);
return true;

// Handle server unavailability (HTTP 500 errors)
if (err.response?.status === 500) {
console.log('Server error encountered, postponing job...');
return { shouldPostpone: true, reason: `Server error encountered: ${errorMessage}` };
}
return { shouldPostpone: false, reason: '' };
}
}

private static async moveContact(jobData: MoveContactData): Promise<JobResult> {
private static async moveContact(job: Job): Promise<JobResult> {
try {
const { contactId, parentId, instanceUrl, sessionToken } = jobData;
const { contactId, parentId, instanceUrl, sessionToken } = job.data as MoveContactData;

if (!sessionToken) {
return { success: false, message: 'Missing session token' };
Expand All @@ -86,7 +108,7 @@ export class MoveContactWorker {
const args = this.buildCommandArgs(instanceUrl, token, contactId, parentId);

this.logCommand(command, args);
await this.executeCommand(command, args);
await this.executeCommand(command, args, job);

return { success: true, message: `Job processing completed.` };
} catch (error) {
Expand All @@ -112,7 +134,7 @@ export class MoveContactWorker {
console.log('Executing command:', `${command} ${maskedArgs.join(' ')}`);
}

private static async executeCommand(command: string, args: string[]): Promise<void> {
private static async executeCommand(command: string, args: string[], job: Job): Promise<void> {
return new Promise((resolve, reject) => {
const chtProcess = spawn(command, args);
let lastOutput = '';
Expand All @@ -123,12 +145,13 @@ export class MoveContactWorker {
}, this.MAX_TIMEOUT_IN_MILLIS);

chtProcess.stdout.on('data', data => {
console.log(`cht-conf: ${data}`);
lastOutput = data.toString();
this.jobLogWithTimestamp(job, `cht-conf output: ${data.toString()}`);
});

chtProcess.stderr.on('data', error => {
console.error(`cht-conf error: ${error}`);
lastOutput = error.toString();
this.jobLogWithTimestamp(job, `cht-conf error: ${error.toString()}`);
});

chtProcess.on('close', code => {
Expand All @@ -141,22 +164,28 @@ export class MoveContactWorker {

chtProcess.on('error', error => {
clearTimeout(timeout);
console.log(error);
this.jobLogWithTimestamp(job, `cht-conf process error: ${error.toString()}`);
reject(error);
});
});
}

private static async postpone(job: Job, processingToken?: string): Promise<void> {
// Calculate the retry time using luxon
private static async postpone(job: Job, retryMessage: string, processingToken?: string): Promise<void> {
const { retryTimeFormatted, retryTime } = this.computeRetryTime();
this.jobLogWithTimestamp(job, `Job ${job.id} postponed until ${retryTimeFormatted}. Reason: ${retryMessage}.`);
await job.moveToDelayed(retryTime.toMillis(), processingToken);
}

private static computeRetryTime(): { retryTime: DateTime; retryTimeFormatted: string } {
const retryTime = DateTime.now().plus({ milliseconds: this.DELAY_IN_MILLIS });
const retryTimeFormatted = retryTime.toLocaleString(DateTime.TIME_SIMPLE);

// Delayed this job by DELAY_IN_MILLIS, using the current worker processing token
await job.moveToDelayed(retryTime.toMillis(), processingToken);
return { retryTime, retryTimeFormatted };
}

const retryMessage = `Job ${job.id} postponed until ${retryTimeFormatted}. Reason was sentinel backlog.`;
job.log(`[${new Date().toISOString()}]: ${retryMessage}`);
console.log(retryMessage);
private static jobLogWithTimestamp(job: Job|MinimalJob|undefined, message: string): void {
const timestamp = new Date().toISOString();
const fullMessage = `[${timestamp}] ${message}`;
job?.log(fullMessage);
console.log(fullMessage);
}
}
5 changes: 3 additions & 2 deletions test/integration/move-contact.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('integration/move-contact', function () {

const queueName = 'move_contact_queue';
const connection = { host: '127.0.0.1', port: 6363 };
const defaultJobOptions = { attempts: 3, backoff: { type: 'custom' } };

let sandbox: sinon.SinonSandbox;
let addStub: sinon.SinonStub;
Expand All @@ -32,7 +33,7 @@ describe('integration/move-contact', function () {

beforeEach(async () => {
sandbox = sinon.createSandbox();
moveContactQueue = new BullQueue(queueName, connection);
moveContactQueue = new BullQueue(queueName, connection, defaultJobOptions);
addStub = sandbox.stub(moveContactQueue, 'add');

handleJobStub = sandbox.stub(MoveContactWorker as any, 'handleJob');
Expand Down Expand Up @@ -116,7 +117,7 @@ describe('integration/move-contact', function () {
await new Promise(resolve => setTimeout(resolve, 1000));

// Check if the job has failed
const job = await moveContactQueue['bullQueue'].getJob(jobId) as Job;
const job = await moveContactQueue['bullQueue'].getJob(jobId) as unknown as Job;
expect(await job.getState()).to.equal('failed');
});
});

0 comments on commit f3457b7

Please sign in to comment.