From d695be3ff1c365b638b35d0788f0bab1d87c6cca Mon Sep 17 00:00:00 2001 From: Samuel Bodin <1637651+bodinsamuel@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:47:59 +0200 Subject: [PATCH] fix(jobs): try to handle knex pool exhaustion (#2771) ## Describe your changes - Handle unhandledRejection and uncaughtException This is something we need to do globally, but it's the opportunity to start. Not entirely sure the error we have is related to uncaught error but we'll see. We don't properly log errors anywhere because we don't have a working sentry and our Datadog error log is shaky. - Add proper health checks Turns out /health is only called on web service, so our current health check (which was not doing much) was useless anyway. I try to query nothing and timeout quickly - Remove transaction from Delete Syncs cron This is the only place where there was an exotic way of using transactions. Honestly, I'm not convinced there was an issue but since the transaction was useless I'm removing it --- packages/database/lib/getConfig.ts | 6 +- packages/jobs/lib/app.ts | 52 +++++++++++--- packages/jobs/lib/crons/deleteSyncsData.ts | 72 +++++++++---------- packages/jobs/lib/routes/getHealth.ts | 5 +- packages/jobs/lib/tracer.ts | 3 + .../shared/lib/services/sync/job.service.ts | 2 +- packages/utils/lib/telemetry/metrics.ts | 2 +- 7 files changed, 91 insertions(+), 51 deletions(-) diff --git a/packages/database/lib/getConfig.ts b/packages/database/lib/getConfig.ts index eb533b8fc2c..0542ea368bb 100644 --- a/packages/database/lib/getConfig.ts +++ b/packages/database/lib/getConfig.ts @@ -18,8 +18,10 @@ export function getDbConfig({ timeoutMs }: { timeoutMs: number }): Knex.Config { statement_timeout: timeoutMs }, pool: { - min: parseInt(process.env['NANGO_DB_POOL_MIN'] || '2'), - max: parseInt(process.env['NANGO_DB_POOL_MAX'] || '30') + min: parseInt(process.env['NANGO_DB_POOL_MIN'] || '0'), + max: parseInt(process.env['NANGO_DB_POOL_MAX'] || '30'), + acquireTimeoutMillis: 20000, + createTimeoutMillis: 10000 }, // SearchPath needs the current db and public because extension can only be installed once per DB searchPath: [defaultSchema, 'public', ...additionalSchemas] diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index e29580214bb..b9d453d521a 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -6,6 +6,7 @@ import { deleteSyncsData } from './crons/deleteSyncsData.js'; import { getLogger, stringifyError } from '@nangohq/utils'; import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js'; import { envs } from './env.js'; +import db from '@nangohq/database'; const logger = getLogger('Jobs'); @@ -16,20 +17,55 @@ try { logger.info(`🚀 service ready at http://localhost:${port}`); const processor = new Processor(orchestratorUrl); + // We are using a setTimeout because we don't want overlapping setInterval if the DB is down + let healthCheck: NodeJS.Timeout | undefined; + const check = async () => { + try { + await db.knex.raw('SELECT 1').timeout(1000); + healthCheck = setTimeout(check); + } catch (err) { + logger.error('HealthCheck failed...', err); + void close(); + } + }; + void check(); + + const close = async () => { + clearTimeout(healthCheck); + processor.stop(); + await db.knex.destroy(); + srv.close(() => { + process.exit(); + }); + }; + + process.on('SIGINT', () => { + logger.info('Received SIGINT...'); + void close(); + }); + + process.on('SIGTERM', () => { + logger.info('Received SIGTERM...'); + void close(); + }); + + process.on('unhandledRejection', (reason) => { + logger.error('Received unhandledRejection...', reason); + process.exitCode = 1; + void close(); + }); + + process.on('uncaughtException', (e) => { + logger.error('Received uncaughtException...', e); + // not closing on purpose + }); + processor.start(); // Register recurring tasks cronAutoIdleDemo(); deleteSyncsData(); timeoutLogsOperations(); - - // handle SIGTERM - process.on('SIGTERM', () => { - processor.stop(); - srv.close(() => { - process.exit(0); - }); - }); } catch (err) { logger.error(stringifyError(err)); process.exit(1); diff --git a/packages/jobs/lib/crons/deleteSyncsData.ts b/packages/jobs/lib/crons/deleteSyncsData.ts index b412e3ed070..133fe9a5145 100644 --- a/packages/jobs/lib/crons/deleteSyncsData.ts +++ b/packages/jobs/lib/crons/deleteSyncsData.ts @@ -20,6 +20,8 @@ export function deleteSyncsData(): void { const start = Date.now(); try { await exec(); + + logger.info('[deleteSyncs] ✅ done'); } catch (err: unknown) { const e = new Error('failed_to_hard_delete_syncs_data', { cause: err instanceof Error ? err.message : err }); errorManager.report(e, { source: ErrorSourceEnum.PLATFORM }, tracer); @@ -31,49 +33,43 @@ export function deleteSyncsData(): void { export async function exec(): Promise { logger.info('[deleteSyncs] starting'); - await db.knex.transaction(async (trx) => { - // Because it's slow and create deadlocks - // we need to acquire a Lock that prevents any other duplicate cron to execute the same thing - const { rows } = await trx.raw<{ rows: { delete_syncs: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as delete_syncs`, [123456789]); - if (!rows || rows.length <= 0 || !rows[0]!.delete_syncs) { - logger.info(`[deleteSyncs] could not acquire lock, skipping`); - return; - } - - // NB: we are not using trx again, we only care about the lock + // Because it's slow and create deadlocks + // we need to acquire a Lock that prevents any other duplicate cron to execute the same thing + const { rows } = await db.knex.raw<{ rows: { delete_syncs: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?) as delete_syncs`, [123456789]); + if (!rows || rows.length <= 0 || !rows[0]!.delete_syncs) { + logger.info(`[deleteSyncs] could not acquire lock, skipping`); + return; + } - const syncs = await findRecentlyDeletedSync(); + const syncs = await findRecentlyDeletedSync(); - const orchestrator = new Orchestrator(orchestratorClient); + const orchestrator = new Orchestrator(orchestratorClient); - for (const sync of syncs) { - logger.info(`[deleteSyncs] deleting syncId: ${sync.id}`); + for (const sync of syncs) { + logger.info(`[deleteSyncs] deleting syncId: ${sync.id}`); - // Soft delete jobs - let countJobs = 0; - do { - countJobs = await softDeleteJobs({ syncId: sync.id, limit: limitJobs }); - logger.info(`[deleteSyncs] soft deleted ${countJobs} jobs`); - metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_JOBS, countJobs); - } while (countJobs >= limitJobs); + // Soft delete jobs + let countJobs = 0; + do { + countJobs = await softDeleteJobs({ syncId: sync.id, limit: limitJobs }); + logger.info(`[deleteSyncs] soft deleted ${countJobs} jobs`); + metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_JOBS, countJobs); + } while (countJobs >= limitJobs); - // ----- - // Soft delete schedules - const resSchedule = await orchestrator.deleteSync({ syncId: sync.id, environmentId: sync.environmentId }); - const deletedScheduleCount = resSchedule.isErr() ? 1 : 0; - logger.info(`[deleteSyncs] soft deleted ${deletedScheduleCount} schedules`); - metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_SCHEDULES, deletedScheduleCount); + // ----- + // Soft delete schedules + const resSchedule = await orchestrator.deleteSync({ syncId: sync.id, environmentId: sync.environmentId }); + const deletedScheduleCount = resSchedule.isErr() ? 1 : 0; + logger.info(`[deleteSyncs] soft deleted ${deletedScheduleCount} schedules`); + metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_SCHEDULES, deletedScheduleCount); - // ---- - // hard delete records - let deletedRecords = 0; - for (const model of sync.models) { - const res = await records.deleteRecordsBySyncId({ connectionId: sync.connectionId, model, syncId: sync.id, limit: limitRecords }); - deletedRecords += res.totalDeletedRecords; - } - metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, deletedRecords); + // ---- + // hard delete records + let deletedRecords = 0; + for (const model of sync.models) { + const res = await records.deleteRecordsBySyncId({ connectionId: sync.connectionId, model, syncId: sync.id, limit: limitRecords }); + deletedRecords += res.totalDeletedRecords; } - }); - - logger.info('[deleteSyncs] ✅ done'); + metrics.increment(metrics.Types.JOBS_DELETE_SYNCS_DATA_RECORDS, deletedRecords); + } } diff --git a/packages/jobs/lib/routes/getHealth.ts b/packages/jobs/lib/routes/getHealth.ts index a179c9bf2b8..b4722e15a4f 100644 --- a/packages/jobs/lib/routes/getHealth.ts +++ b/packages/jobs/lib/routes/getHealth.ts @@ -14,5 +14,8 @@ export const routeHandler: RouteHandler = { path, method, validate: (_req, _res, next) => next(), // No extra validation needed - handler: (_req, res) => res.status(200).json({ status: 'ok' }) + handler: (_req, res) => { + // Not used in Render + res.status(200).json({ status: 'ok' }); + } }; diff --git a/packages/jobs/lib/tracer.ts b/packages/jobs/lib/tracer.ts index c74e6eb3503..623efaab554 100644 --- a/packages/jobs/lib/tracer.ts +++ b/packages/jobs/lib/tracer.ts @@ -6,6 +6,9 @@ tracer.init({ tracer.use('pg', { service: (params: { database: string }) => `postgres-${params.database}` }); +tracer.use('express', { + enabled: true +}); tracer.use('elasticsearch', { service: 'nango-elasticsearch' }); diff --git a/packages/shared/lib/services/sync/job.service.ts b/packages/shared/lib/services/sync/job.service.ts index e4928e61247..36f4e1d5b5f 100644 --- a/packages/shared/lib/services/sync/job.service.ts +++ b/packages/shared/lib/services/sync/job.service.ts @@ -100,7 +100,7 @@ export const updateLatestJobSyncStatus = async (sync_id: string, status: SyncSta * @desc grab any existing results and add them to the current */ export const updateSyncJobResult = async (id: number, result: SyncResultByModel, model: string): Promise => { - return db.knex.transaction(async (trx) => { + return await db.knex.transaction(async (trx) => { const { result: existingResult } = await trx.from(SYNC_JOB_TABLE).select('result').forUpdate().where({ id }).first(); if (!existingResult || Object.keys(existingResult).length === 0) { diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index d6f21275f76..0d91d0c1954 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -82,7 +82,7 @@ export function time Promise>(metricName: T }; // This function should handle both async/sync function - // So it's try/catch regular execution and use .then() for async + // So it try/catch regular execution and use .then() for async // @ts-expect-error can't fix this return function wrapped(...args: any) { const start = process.hrtime();