Skip to content

Commit

Permalink
Merge branch 'main' into 162-merge-contacts
Browse files Browse the repository at this point in the history
  • Loading branch information
kennsippell committed Dec 9, 2024
2 parents 48f67ad + 846b8bf commit fc738e5
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dist
src/package.json
.eslintcache
.DS_Store
upload-docs*
2 changes: 1 addition & 1 deletion src/config/config-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const WorkerConfig = {
host: environment.REDIS_HOST,
port: Number(environment.REDIS_PORT),
},
queueName: 'MOVE_CONTACT_QUEUE',
moveContactQueue: 'MOVE_CONTACT_QUEUE',
defaultJobOptions: {
attempts: 3, // Max retries for a failed job
backoff: {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/authentication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import process from 'process';
import jwt from 'jsonwebtoken';
import ChtSession from './cht-session';

const LOGIN_EXPIRES_AFTER_MS = 2 * 24 * 60 * 60 * 1000;
const QUEUE_SESSION_EXPIRATION = '48h';
const LOGIN_EXPIRES_AFTER_MS = 4 * 24 * 60 * 60 * 1000;
const QUEUE_SESSION_EXPIRATION = '96h';
const { COOKIE_PRIVATE_KEY, WORKER_PRIVATE_KEY } = process.env;
const PRIVATE_KEY_SALT = '_'; // change to logout all users
const COOKIE_SIGNING_KEY = COOKIE_PRIVATE_KEY + PRIVATE_KEY_SALT;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class BullQueue implements IQueue {
}

export const getChtConfQueue = () => new BullQueue(
WorkerConfig.queueName,
WorkerConfig.moveContactQueue,
WorkerConfig.redisConnection,
WorkerConfig.defaultJobOptions
);
36 changes: 23 additions & 13 deletions src/worker/cht-conf-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ export interface ChtConfJobData {
instanceUrl: string;
}

export interface PostponeReason {
reason: string;
}

export type JobResult = { success: boolean; message: string };

export class ChtConfWorker {
Expand Down Expand Up @@ -58,7 +62,7 @@ export class ChtConfWorker {
if (!result.success) {
const errorMessage = `Job ${job.id} failed with the following error: ${result.message}`;
console.error(errorMessage);
this.jobLogWithTimestamp(job, errorMessage);
this.logWithTimestamp(job, errorMessage);
throw new Error(errorMessage);
}

Expand All @@ -70,28 +74,34 @@ export class ChtConfWorker {
attemptsMade: number, type: string | undefined, err: Error | undefined, job: MinimalJob | undefined
): number => {
const {retryTimeFormatted} = this.computeRetryTime();
const fullMessage = `Job ${job?.id} will be retried ${attemptsMade + 1} time at ${retryTimeFormatted}. Due to failure: ${type}: ${err?.message}`;
this.jobLogWithTimestamp(job, fullMessage);

const fullMessage = `Job ${job?.id} will retry at ${retryTimeFormatted}.\
Attempt Number: ${attemptsMade + 1}. Due to failure: ${type}: ${err?.message}`;

this.logWithTimestamp(job, fullMessage);
return this.DELAY_IN_MILLIS;
};

private static async shouldPostpone(jobData: ChtConfJobData): Promise<{ shouldPostpone: boolean; reason: string }> {
private static async shouldPostpone(jobData: ChtConfJobData): Promise<PostponeReason | undefined> {
try {
const { instanceUrl } = jobData;
const response = await axios.get(`${instanceUrl}/api/v2/monitoring`);
const sentinelBacklog = response.data.sentinel?.backlog;
console.log(`Sentinel backlog at ${sentinelBacklog} of ${this.MAX_SENTINEL_BACKLOG}`);
return { shouldPostpone: sentinelBacklog > this.MAX_SENTINEL_BACKLOG, reason: `Sentinel backlog too high at ${sentinelBacklog}` };

return sentinelBacklog > this.MAX_SENTINEL_BACKLOG
? { reason: `Sentinel backlog too high at ${sentinelBacklog}` }
: undefined;
} catch (err: any) {
const errorMessage = err.response?.data?.error?.message || err.response?.error || err?.message;
console.error('Error fetching monitoring data:', errorMessage);

// Handle server unavailability (HTTP 500 errors)
if (err.response?.status === 500) {
console.log('Server error encountered, postponing job...');
return { shouldPostpone: true, reason: `Server error encountered: ${errorMessage}` };
return { reason: `Server error encountered: ${errorMessage}` };
}
return { shouldPostpone: false, reason: '' };
return undefined;
}
}

Expand Down Expand Up @@ -169,12 +179,12 @@ export class ChtConfWorker {

chtProcess.stdout.on('data', data => {
lastOutput = data.toString();
this.jobLogWithTimestamp(job, `cht-conf output: ${data.toString()}`);
this.logWithTimestamp(job, `cht-conf output: ${data.toString()}`);
});

chtProcess.stderr.on('data', error => {
lastOutput = error.toString();
this.jobLogWithTimestamp(job, `cht-conf error: ${error.toString()}`);
this.logWithTimestamp(job, `cht-conf error: ${error.toString()}`);
});

chtProcess.on('close', code => {
Expand All @@ -187,15 +197,15 @@ export class ChtConfWorker {

chtProcess.on('error', error => {
clearTimeout(timeout);
this.jobLogWithTimestamp(job, `cht-conf process error: ${error.toString()}`);
this.logWithTimestamp(job, `cht-conf process error: ${error.toString()}`);
reject(error);
});
});
}

private static async postpone(job: Job, retryMessage: string, processingToken?: string): Promise<void> {
const { retryTimeFormatted, retryTime } = this.computeRetryTime();
this.jobLogWithTimestamp(job, `Job ${job.id} postponed until ${retryTimeFormatted}. Reason: ${retryMessage}.`);
this.logWithTimestamp(job, `Job ${job.id} postponed until ${retryTimeFormatted}. Reason: ${retryMessage}.`);
await job.moveToDelayed(retryTime.toMillis(), processingToken);
}

Expand All @@ -205,8 +215,8 @@ export class ChtConfWorker {
return { retryTime, retryTimeFormatted };
}

private static jobLogWithTimestamp(job: Job|MinimalJob|undefined, message: string): void {
const timestamp = new Date().toISOString();
private static logWithTimestamp(job: Job|MinimalJob|undefined, message: string): void {
const timestamp = DateTime.now().toISO();
const fullMessage = `[${timestamp}] ${message}`;
job?.log(fullMessage);
console.log(fullMessage);
Expand Down

0 comments on commit fc738e5

Please sign in to comment.