From a9ffb5e13ad2089204ba38e6ddf5fa848c27c2e1 Mon Sep 17 00:00:00 2001 From: ashish-egov Date: Thu, 4 Jul 2024 10:35:55 +0530 Subject: [PATCH] Shut down if kafka error --- utilities/project-factory/src/server/kafka/Listener.ts | 3 ++- utilities/project-factory/src/server/kafka/Producer.ts | 2 ++ .../project-factory/src/server/utils/genericUtils.ts | 9 ++++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/utilities/project-factory/src/server/kafka/Listener.ts b/utilities/project-factory/src/server/kafka/Listener.ts index bfe4eb4529b..bf7bd8d4a86 100644 --- a/utilities/project-factory/src/server/kafka/Listener.ts +++ b/utilities/project-factory/src/server/kafka/Listener.ts @@ -4,7 +4,7 @@ import { getFormattedStringForDebug, logger } from '../utils/logger'; // Importi import { producer } from './Producer'; // Importing producer from the Producer module import { processCampaignMapping } from '../utils/campaignMappingUtils'; import { enrichAndPersistCampaignWithError } from '../utils/campaignUtils'; -import { throwError } from '../utils/genericUtils'; +import { shutdownGracefully, throwError } from '../utils/genericUtils'; @@ -58,6 +58,7 @@ export function listener() { // Set up error event handlers consumerGroup.on('error', (err) => { console.error(`Consumer Error: ${err}`); + shutdownGracefully(); }); consumerGroup.on('offsetOutOfRange', (err) => { diff --git a/utilities/project-factory/src/server/kafka/Producer.ts b/utilities/project-factory/src/server/kafka/Producer.ts index b728b48a2ca..4c1e56dcd71 100644 --- a/utilities/project-factory/src/server/kafka/Producer.ts +++ b/utilities/project-factory/src/server/kafka/Producer.ts @@ -1,6 +1,7 @@ import config from '../config'; // Importing configuration settings import { Producer, KafkaClient } from 'kafka-node'; // Importing Producer and KafkaClient from 'kafka-node' library import { logger } from "../utils/logger"; +import { shutdownGracefully } from '../utils/genericUtils'; // Creating a new Kafka client instance using the configured Kafka broker host const kafkaClient = new KafkaClient({ @@ -20,6 +21,7 @@ producer.on('ready', () => { producer.on('error', (err) => { logger.error('Producer is in error state'); // Log message indicating producer is in error state console.error(err.stack || err); // Log the error stack or message + shutdownGracefully(); }); export { producer }; // Exporting the producer instance for external use diff --git a/utilities/project-factory/src/server/utils/genericUtils.ts b/utilities/project-factory/src/server/utils/genericUtils.ts index cd95367ae73..769f4e0199f 100644 --- a/utilities/project-factory/src/server/utils/genericUtils.ts +++ b/utilities/project-factory/src/server/utils/genericUtils.ts @@ -51,6 +51,12 @@ const throwErrorViaRequest = (message: any = "Internal Server Error") => { } }; +function shutdownGracefully() { + logger.info('Shutting down gracefully...'); + // Perform any cleanup tasks here, like closing database connections + process.exit(1); // Exit with a non-zero code to indicate an error +} + function capitalizeFirstLetter(str: string | undefined) { if (!str) return str; return str.charAt(0).toUpperCase() + str.slice(1); @@ -1136,7 +1142,8 @@ export { changeFirstRowColumnColour, getConfigurableColumnHeadersFromSchemaForTargetSheet, createBoundaryDataMainSheet, - getMdmsDataBasedOnCampaignType + getMdmsDataBasedOnCampaignType, + shutdownGracefully };