Skip to content

Commit

Permalink
Prepare to give exporter its own queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonh committed Nov 1, 2024
1 parent cd0ac4b commit a8d4f7a
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 1 deletion.
4 changes: 3 additions & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
"build": "tsc",
"dev": "ts-node-dev --respawn --transpile-only src/server.ts",
"dev_qp": "ts-node-dev --respawn --transpile-only src/queue-processor.ts",
"dev_ep": "ts-node-dev --respawn --transpile-only src/export-processor.ts",
"start": "node dist/src/server.js",
"start_queue_processor": "node dist/src/queue-processor.js",
"start_export_processor": "node dist/src/export-processor.js",
"lint": "eslint src --ext ts,js,tsx,jsx",
"lint:fix": "eslint src --fix --ext ts,js,tsx,jsx",
"test:typecheck": "tsc --noEmit",
Expand Down Expand Up @@ -174,4 +176,4 @@
"volta": {
"extends": "../../package.json"
}
}
}
187 changes: 187 additions & 0 deletions packages/api/src/export-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/* eslint-disable @typescript-eslint/no-floating-promises */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/require-await */
/* eslint-disable @typescript-eslint/no-misused-promises */
import {
ConnectionOptions,
Job,
JobState,
JobType,
Queue,
QueueEvents,
Worker,
} from 'bullmq'
import express, { Express } from 'express'
import client from 'prom-client'
import { appDataSource } from './data_source'
import { env } from './env'
import { TaskState } from './generated/graphql'
import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize'
import { createDigest, CREATE_DIGEST_JOB } from './jobs/ai/create_digest'
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook'
import {
confirmEmailJob,
CONFIRM_EMAIL_JOB,
forwardEmailJob,
FORWARD_EMAIL_JOB,
saveAttachmentJob,
saveNewsletterJob,
SAVE_ATTACHMENT_JOB,
SAVE_NEWSLETTER_JOB,
} from './jobs/email/inbound_emails'
import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email'
import {
expireFoldersJob,
EXPIRE_FOLDERS_JOB_NAME,
} from './jobs/expire_folders'
import { exportJob, EXPORT_JOB_NAME } from './jobs/export'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import {
generatePreviewContent,
GENERATE_PREVIEW_CONTENT_JOB,
} from './jobs/generate_preview_content'
import {
exportAllItems,
EXPORT_ALL_ITEMS_JOB_NAME,
} from './jobs/integration/export_all_items'
import {
exportItem,
EXPORT_ITEM_JOB_NAME,
} from './jobs/integration/export_item'
import {
processYouTubeTranscript,
processYouTubeVideo,
PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME,
PROCESS_YOUTUBE_VIDEO_JOB_NAME,
} from './jobs/process-youtube-video'
import { pruneTrashJob, PRUNE_TRASH_JOB } from './jobs/prune_trash'
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
import { refreshFeed } from './jobs/rss/refreshFeed'
import { savePageJob } from './jobs/save_page'
import {
scoreLibraryItem,
SCORE_LIBRARY_ITEM_JOB,
} from './jobs/score_library_item'
import {
syncReadPositionsJob,
SYNC_READ_POSITIONS_JOB_NAME,
} from './jobs/sync_read_positions'

import { redisDataSource } from './redis_data_source'
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
import { logger } from './utils/logger'
import { getQueue } from './queue-processor'

export const EXPORT_QUEUE_NAME = 'omnivore-export-queue'

export const createWorker = (connection: ConnectionOptions) =>
new Worker(
EXPORT_QUEUE_NAME,
async (job: Job) => {
const executeJob = async (job: Job) => {
switch (job.name) {
case EXPORT_JOB_NAME:
return exportJob(job.data)
default:
logger.warning(`[export-processor] unhandled job: ${job.name}`)
}
}

await executeJob(job)
},
{
connection,
autorun: true, // start processing jobs immediately
lockDuration: 60_000, // 1 minute
concurrency: 2,
}
)

const main = async () => {
console.log('[export-processor]: starting export queue processor')

const app: Express = express()
const port = process.env.PORT || 3003

redisDataSource.setOptions({
cache: env.redis.cache,
mq: env.redis.mq,
})

// respond healthy to auto-scaler.
app.get('/_ah/health', (req, res) => res.sendStatus(200))

app.get('/lifecycle/prestop', async (req, res) => {
logger.info('prestop lifecycle hook called.')
await worker.close()
res.sendStatus(200)
})

const server = app.listen(port, () => {
console.log(`[export-processor]: started`)
})

// This is done after all the setup so it can access the
// environment that was loaded from GCP
await appDataSource.initialize()
await redisDataSource.initialize()

const redisClient = redisDataSource.redisClient
const workerRedisClient = redisDataSource.workerRedisClient
if (!workerRedisClient || !redisClient) {
throw '[export-processor] error redis is not initialized'
}

const worker = createWorker(workerRedisClient)

workerRedisClient.on('error', (error) => {
console.trace('[export-processor]: redis worker error', { error })
})

redisClient.on('error', (error) => {
console.trace('[export-processor]: redis error', { error })
})

const gracefulShutdown = async (signal: string) => {
console.log(`[export-processor]: Received ${signal}, closing server...`)
await new Promise<void>((resolve) => {
server.close((err) => {
console.log('[export-processor]: Express server closed')
if (err) {
console.log('[export-processor]: error stopping server', { err })
}

resolve()
})
})
await worker.close()
console.log('[export-processor]: Worker closed')

await redisDataSource.shutdown()
console.log('[export-processor]: Redis connection closed')

await appDataSource.destroy()
console.log('[export-processor]: DB connection closed')

process.exit(0)
}

process.on('SIGINT', () => gracefulShutdown('SIGINT'))
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))

process.on('uncaughtException', function (err) {
// Handle the error safely
logger.error('Uncaught exception', err)
})

process.on('unhandledRejection', (reason, promise) => {
// Handle the error safely
logger.error('Unhandled Rejection at: Promise', { promise, reason })
})
}

// only call main if the file was called from the CLI and wasn't required from another module
if (require.main === module) {
main().catch((e) => console.error(e))
}
2 changes: 2 additions & 0 deletions packages/api/src/utils/createTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import { CreateTaskError } from './errors'
import { stringToHash } from './helpers'
import { logError, logger } from './logger'
import View = google.cloud.tasks.v2.Task.View
import { EXPORT_QUEUE_NAME } from '../export-processor'

// Instantiates a client.
const client = new CloudTasksClient()
Expand Down Expand Up @@ -1075,6 +1076,7 @@ export const enqueueExpireFoldersJob = async () => {

export const queueExportJob = async (userId: string, exportId: string) => {
const queue = await getQueue()
// const queue = await getQueue(EXPORT_QUEUE_NAME)
if (!queue) {
return undefined
}
Expand Down

0 comments on commit a8d4f7a

Please sign in to comment.