From a8d4f7acd5e1e9324dfc0625f942b9f2268d97cb Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 1 Nov 2024 23:33:46 +0800 Subject: [PATCH] Prepare to give exporter its own queue --- packages/api/package.json | 4 +- packages/api/src/export-processor.ts | 187 +++++++++++++++++++++++++++ packages/api/src/utils/createTask.ts | 2 + 3 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 packages/api/src/export-processor.ts diff --git a/packages/api/package.json b/packages/api/package.json index dfb233d4c9..96d3056300 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -6,8 +6,10 @@ "build": "tsc", "dev": "ts-node-dev --respawn --transpile-only src/server.ts", "dev_qp": "ts-node-dev --respawn --transpile-only src/queue-processor.ts", + "dev_ep": "ts-node-dev --respawn --transpile-only src/export-processor.ts", "start": "node dist/src/server.js", "start_queue_processor": "node dist/src/queue-processor.js", + "start_export_processor": "node dist/src/export-processor.js", "lint": "eslint src --ext ts,js,tsx,jsx", "lint:fix": "eslint src --fix --ext ts,js,tsx,jsx", "test:typecheck": "tsc --noEmit", @@ -174,4 +176,4 @@ "volta": { "extends": "../../package.json" } -} +} \ No newline at end of file diff --git a/packages/api/src/export-processor.ts b/packages/api/src/export-processor.ts new file mode 100644 index 0000000000..92b712cd1e --- /dev/null +++ b/packages/api/src/export-processor.ts @@ -0,0 +1,187 @@ +/* eslint-disable @typescript-eslint/no-floating-promises */ +/* eslint-disable @typescript-eslint/restrict-template-expressions */ +/* eslint-disable @typescript-eslint/require-await */ +/* eslint-disable @typescript-eslint/no-misused-promises */ +import { + ConnectionOptions, + Job, + JobState, + JobType, + Queue, + QueueEvents, + Worker, +} from 'bullmq' +import express, { Express } from 'express' +import client from 'prom-client' +import { appDataSource } from './data_source' +import { env } from './env' +import { TaskState } from './generated/graphql' +import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize' +import { createDigest, CREATE_DIGEST_JOB } from './jobs/ai/create_digest' +import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' +import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' +import { + confirmEmailJob, + CONFIRM_EMAIL_JOB, + forwardEmailJob, + FORWARD_EMAIL_JOB, + saveAttachmentJob, + saveNewsletterJob, + SAVE_ATTACHMENT_JOB, + SAVE_NEWSLETTER_JOB, +} from './jobs/email/inbound_emails' +import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email' +import { + expireFoldersJob, + EXPIRE_FOLDERS_JOB_NAME, +} from './jobs/expire_folders' +import { exportJob, EXPORT_JOB_NAME } from './jobs/export' +import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' +import { + generatePreviewContent, + GENERATE_PREVIEW_CONTENT_JOB, +} from './jobs/generate_preview_content' +import { + exportAllItems, + EXPORT_ALL_ITEMS_JOB_NAME, +} from './jobs/integration/export_all_items' +import { + exportItem, + EXPORT_ITEM_JOB_NAME, +} from './jobs/integration/export_item' +import { + processYouTubeTranscript, + processYouTubeVideo, + PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME, + PROCESS_YOUTUBE_VIDEO_JOB_NAME, +} from './jobs/process-youtube-video' +import { pruneTrashJob, PRUNE_TRASH_JOB } from './jobs/prune_trash' +import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' +import { refreshFeed } from './jobs/rss/refreshFeed' +import { savePageJob } from './jobs/save_page' +import { + scoreLibraryItem, + SCORE_LIBRARY_ITEM_JOB, +} from './jobs/score_library_item' +import { + syncReadPositionsJob, + SYNC_READ_POSITIONS_JOB_NAME, +} from './jobs/sync_read_positions' + +import { redisDataSource } from './redis_data_source' +import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' +import { logger } from './utils/logger' +import { getQueue } from './queue-processor' + +export const EXPORT_QUEUE_NAME = 'omnivore-export-queue' + +export const createWorker = (connection: ConnectionOptions) => + new Worker( + EXPORT_QUEUE_NAME, + async (job: Job) => { + const executeJob = async (job: Job) => { + switch (job.name) { + case EXPORT_JOB_NAME: + return exportJob(job.data) + default: + logger.warning(`[export-processor] unhandled job: ${job.name}`) + } + } + + await executeJob(job) + }, + { + connection, + autorun: true, // start processing jobs immediately + lockDuration: 60_000, // 1 minute + concurrency: 2, + } + ) + +const main = async () => { + console.log('[export-processor]: starting export queue processor') + + const app: Express = express() + const port = process.env.PORT || 3003 + + redisDataSource.setOptions({ + cache: env.redis.cache, + mq: env.redis.mq, + }) + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + app.get('/lifecycle/prestop', async (req, res) => { + logger.info('prestop lifecycle hook called.') + await worker.close() + res.sendStatus(200) + }) + + const server = app.listen(port, () => { + console.log(`[export-processor]: started`) + }) + + // This is done after all the setup so it can access the + // environment that was loaded from GCP + await appDataSource.initialize() + await redisDataSource.initialize() + + const redisClient = redisDataSource.redisClient + const workerRedisClient = redisDataSource.workerRedisClient + if (!workerRedisClient || !redisClient) { + throw '[export-processor] error redis is not initialized' + } + + const worker = createWorker(workerRedisClient) + + workerRedisClient.on('error', (error) => { + console.trace('[export-processor]: redis worker error', { error }) + }) + + redisClient.on('error', (error) => { + console.trace('[export-processor]: redis error', { error }) + }) + + const gracefulShutdown = async (signal: string) => { + console.log(`[export-processor]: Received ${signal}, closing server...`) + await new Promise((resolve) => { + server.close((err) => { + console.log('[export-processor]: Express server closed') + if (err) { + console.log('[export-processor]: error stopping server', { err }) + } + + resolve() + }) + }) + await worker.close() + console.log('[export-processor]: Worker closed') + + await redisDataSource.shutdown() + console.log('[export-processor]: Redis connection closed') + + await appDataSource.destroy() + console.log('[export-processor]: DB connection closed') + + process.exit(0) + } + + process.on('SIGINT', () => gracefulShutdown('SIGINT')) + process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) + + process.on('uncaughtException', function (err) { + // Handle the error safely + logger.error('Uncaught exception', err) + }) + + process.on('unhandledRejection', (reason, promise) => { + // Handle the error safely + logger.error('Unhandled Rejection at: Promise', { promise, reason }) + }) +} + +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main().catch((e) => console.error(e)) +} diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index af1d9894c4..72054ac0d2 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -75,6 +75,7 @@ import { CreateTaskError } from './errors' import { stringToHash } from './helpers' import { logError, logger } from './logger' import View = google.cloud.tasks.v2.Task.View +import { EXPORT_QUEUE_NAME } from '../export-processor' // Instantiates a client. const client = new CloudTasksClient() @@ -1075,6 +1076,7 @@ export const enqueueExpireFoldersJob = async () => { export const queueExportJob = async (userId: string, exportId: string) => { const queue = await getQueue() + // const queue = await getQueue(EXPORT_QUEUE_NAME) if (!queue) { return undefined }