From 739de532c4392b0fc99b71ba53195dc541b85aa3 Mon Sep 17 00:00:00 2001 From: Abhinav Gautam Date: Tue, 9 Apr 2024 14:37:17 +0530 Subject: [PATCH] added additional logging --- src/routes/queue-route.ts | 1 + src/utils/logstash-client.ts | 7 ++++++- src/utils/logstash/index.ts | 3 ++- src/utils/logstash/types.d.ts | 1 + src/utils/queue-batch-processor.ts | 11 ++++++----- src/utils/queue-client.ts | 11 ++++++++++- 6 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/routes/queue-route.ts b/src/routes/queue-route.ts index 44c554a..d1c307c 100644 --- a/src/routes/queue-route.ts +++ b/src/routes/queue-route.ts @@ -33,6 +33,7 @@ router.post("/log", async (req: Request, res: Response) => { data: log, })) ); + console.log("Received %d Logs", logs.length); res.json({ success: true, }); diff --git a/src/utils/logstash-client.ts b/src/utils/logstash-client.ts index 79c7fe5..e40f89c 100644 --- a/src/utils/logstash-client.ts +++ b/src/utils/logstash-client.ts @@ -1,11 +1,16 @@ import Logstash from "./logstash"; let errorLogger = (error: Error) => { - console.log(error); + console.error(error); +}; + +let connectedLogger = () => { + console.log("Connection Established With Logstash"); }; export default new Logstash({ error_logger: errorLogger, + connected_logger: connectedLogger, host: process.env.LOGSTASH_HOST, port: process.env.LOGSTASH_PORT ? parseInt(process.env.LOGSTASH_PORT) diff --git a/src/utils/logstash/index.ts b/src/utils/logstash/index.ts index a2257b1..5f51e2e 100644 --- a/src/utils/logstash/index.ts +++ b/src/utils/logstash/index.ts @@ -12,6 +12,7 @@ export default class LogstashTransport { ? new SecureConnection(options) : new PlainConnection(options); this.manager = new Manager(options, this.connection); + this.manager.on("connected", options.connected_logger.bind(this)); this.manager.on("error", options.error_logger.bind(this)); this.manager.start(); } @@ -23,4 +24,4 @@ export default class LogstashTransport { close() { this.manager.close(); } -}; +} diff --git a/src/utils/logstash/types.d.ts b/src/utils/logstash/types.d.ts index c521443..2e8bef9 100644 --- a/src/utils/logstash/types.d.ts +++ b/src/utils/logstash/types.d.ts @@ -14,6 +14,7 @@ interface LogstashTransportSSLOptions { interface LogstashTransportOptions extends LogstashTransportSSLOptions { error_logger: (...args: any[]) => void; + connected_logger: () => void; host?: string; port?: number; node_name?: string; diff --git a/src/utils/queue-batch-processor.ts b/src/utils/queue-batch-processor.ts index 00fee70..97040b2 100644 --- a/src/utils/queue-batch-processor.ts +++ b/src/utils/queue-batch-processor.ts @@ -7,11 +7,12 @@ export default async (queue: Queue, logstash: Logstash) => { for (let job of jobs) { jobData.push(job.data); } - if(jobData.length>0) { + if (jobData.length > 0) { logstash.log(jobData, async () => { - for (let job of jobs) { - await job.remove(); - } - }); + for (let job of jobs) { + await job.remove(); + } + console.log("Pushed %d Logs To Logstash", jobData.length); + }); } }; diff --git a/src/utils/queue-client.ts b/src/utils/queue-client.ts index df9944b..21ecb5c 100644 --- a/src/utils/queue-client.ts +++ b/src/utils/queue-client.ts @@ -1,5 +1,5 @@ import { Queue } from "bullmq"; -export default new Queue("logs", { +const queue = new Queue("logs", { connection: { host: process.env.REDIS_HOST, username: process.env.REDIS_USERNAME, @@ -11,3 +11,12 @@ export default new Queue("logs", { }, prefix: process.env.QUEUE_PREFIX, }); + +async function checkQueueHealth(queue: Queue) { + const jobCounts = await queue.getJobCounts(); + console.log("Connection Established With Redis Queue", jobCounts); +} + +checkQueueHealth(queue); + +export default queue;