diff --git a/apps/trench/src/appCluster.service.ts b/apps/trench/src/appCluster.service.ts index cc8c230..bd71bff 100644 --- a/apps/trench/src/appCluster.service.ts +++ b/apps/trench/src/appCluster.service.ts @@ -1,6 +1,6 @@ import * as _cluster from 'cluster' import * as os from 'os' -import { Injectable } from '@nestjs/common' +import { Injectable, Logger } from '@nestjs/common' const cluster = _cluster as unknown as _cluster.Cluster // typings fix @@ -8,19 +8,20 @@ const numCPUs = os.cpus().length @Injectable() export class AppClusterService { + private static readonly logger = new Logger(AppClusterService.name) static clusterize(callback: Function): void { if (cluster.isPrimary) { - console.log(`Primary server started on ${process.pid} (using ${numCPUs} processes).`) + this.logger.log(`Primary server started on ${process.pid} (using ${numCPUs} processes).`) for (let i = 0; i < numCPUs; i++) { cluster.fork() } cluster.on('exit', (worker, code, signal) => { - console.log(`Worker ${worker.process.pid} died. Restarting`) + this.logger.log(`Worker ${worker.process.pid} died. Restarting`) cluster.fork() }) } else { const nodeNumber = cluster.worker.id - console.log(`Cluster server started on ${process.pid}`) + this.logger.log(`Cluster server started on ${process.pid}`) callback(nodeNumber) } } diff --git a/apps/trench/src/main.ts b/apps/trench/src/main.ts index ca9b492..93a1e70 100644 --- a/apps/trench/src/main.ts +++ b/apps/trench/src/main.ts @@ -8,6 +8,10 @@ import { AppClusterService } from './appCluster.service' import { KafkaService } from './services/data/kafka/kafka.service' import { ClickHouseService } from './services/data/click-house/click-house.service' import { BootstrapService } from './services/data/bootstrap/bootstrap.service' +import { Logger } from '@nestjs/common' +import * as os from 'os' + +const logger = new Logger('Main') const CORS_OPTIONS = { origin: '*', @@ -25,21 +29,21 @@ const CORS_OPTIONS = { } async function bootstrap(nodeNumber: number) { - console.log(`Starting node ${nodeNumber}`) + logger.log(`Starting node ${nodeNumber}`) let httpsOptions if (process.env.API_HTTPS === 'true') { - console.log('Using https') + logger.log('Using https') httpsOptions = { key: fs.readFileSync('/app/certs/server.key'), cert: fs.readFileSync('/app/certs/server.crt'), } } else { - console.log('Using http') + logger.log('Using http') } - const fastifyAdapter = new FastifyAdapter({ https: httpsOptions }) + const fastifyAdapter = new FastifyAdapter({ https: httpsOptions, logger: true }) fastifyAdapter.enableCors(CORS_OPTIONS) const app = await NestFactory.create(AppModule, fastifyAdapter) @@ -58,14 +62,15 @@ async function bootstrap(nodeNumber: number) { await bootstrapService.bootstrap() } - console.log('Listening on port', port) + logger.log(`Listening on port ${port}`) await app.listen(port, '0.0.0.0') } if (process.env.NODE_ENV !== 'production' && process.env.FORCE_CLUSTER_MODE !== 'true') { - console.log('Running in single instance dev mode') + logger.log('Running in single instance dev mode') bootstrap(1) } else { - console.log('Running in cluster mode') + logger.log('Running in cluster mode with ' + os.cpus().length + ' processes') AppClusterService.clusterize(bootstrap) } + diff --git a/apps/trench/src/services/data/bootstrap/bootstrap.service.ts b/apps/trench/src/services/data/bootstrap/bootstrap.service.ts index a8dacf5..0bc93fa 100644 --- a/apps/trench/src/services/data/bootstrap/bootstrap.service.ts +++ b/apps/trench/src/services/data/bootstrap/bootstrap.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common' +import { Injectable, Logger } from '@nestjs/common' import { ClickHouseService } from '../click-house/click-house.service' import { KafkaService } from '../kafka/kafka.service' import { DEFAULT_WORKSPACE_ID } from '../../../common/constants' @@ -9,6 +9,7 @@ import { Workspace } from '../../../workspaces/workspaces.interface' @Injectable() export class BootstrapService { + private readonly logger = new Logger(BootstrapService.name) constructor( private readonly clickhouseService: ClickHouseService, private readonly kafkaService: KafkaService @@ -34,12 +35,12 @@ export class BootstrapService { } async bootstrapWorkspace(workspace: Workspace) { - console.log(`Creating topics and running migrations for workspace ${workspace.name}`) + this.logger.log(`Creating topics and running migrations for workspace ${workspace.name}`) const kafkaTopicName = await this.kafkaService.createTopicIfNotExists( getKafkaTopicFromWorkspace(workspace) ) await this.clickhouseService.runMigrations(workspace.databaseName, kafkaTopicName) - console.log( + this.logger.log( `Successfully finished creating topics and running migrations for workspace ${workspace.name}` ) } diff --git a/apps/trench/src/services/data/click-house/click-house.service.ts b/apps/trench/src/services/data/click-house/click-house.service.ts index 63b5347..87ae1bb 100644 --- a/apps/trench/src/services/data/click-house/click-house.service.ts +++ b/apps/trench/src/services/data/click-house/click-house.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common' +import { Injectable, Logger } from '@nestjs/common' import { createClient, ClickHouseClient } from '@clickhouse/client' import * as fs from 'fs' import * as path from 'path' @@ -12,6 +12,7 @@ import { @Injectable() export class ClickHouseService { + private readonly logger = new Logger(ClickHouseService.name) private clientMap: Map = new Map() getClient(databaseName?: string): ClickHouseClient { @@ -83,11 +84,11 @@ export class ClickHouseService { for (const file of files) { if (executedFiles.has(file)) { - console.log(`Skipping migration ${file}, already executed `) + this.logger.log(`Skipping migration ${file}, already executed `) continue } - console.log(`Executing migration ${file}`) + this.logger.log(`Executing migration ${file}`) const filePath = path.join(migrationsDir, file) const query = this.applySubstitutions(fs.readFileSync(filePath, 'utf8'), kafkaTopicName) @@ -105,7 +106,7 @@ export class ClickHouseService { if (String(error).includes('already exists')) { continue } - console.error(`Error executing migration ${file} with query ${query}: ${error}`) + this.logger.error(`Error executing migration ${file} with query ${query}: ${error}`, error.stack) throw error } } @@ -116,7 +117,7 @@ export class ClickHouseService { }, ]) - console.log(`Migration ${file} executed successfully`) + this.logger.log(`Migration ${file} executed successfully`) } } diff --git a/apps/trench/src/services/data/kafka/kafka.service.ts b/apps/trench/src/services/data/kafka/kafka.service.ts index 518a80f..681e14f 100644 --- a/apps/trench/src/services/data/kafka/kafka.service.ts +++ b/apps/trench/src/services/data/kafka/kafka.service.ts @@ -1,10 +1,11 @@ -import { Injectable } from '@nestjs/common' +import { Injectable, Logger } from '@nestjs/common' import { Consumer, Kafka, Producer } from 'kafkajs' import { KafkaEventWithUUID } from './kafka.interface' import { DEFAULT_KAFKA_CLIENT_ID, DEFAULT_KAFKA_PARTITIONS } from '../../../common/constants' @Injectable() export class KafkaService { + private readonly logger = new Logger(KafkaService.name) private hasConnectedToProducer = false private kafka: Kafka private producer: Producer @@ -28,13 +29,13 @@ export class KafkaService { process.env.KAFKA_PARTITIONS ? Number(process.env.KAFKA_PARTITIONS) : DEFAULT_KAFKA_PARTITIONS - ).then(() => console.log(`Created topic ${topic}`)) + ).then(() => this.logger.log(`Created topic ${topic}`)) if (process.env.NODE_ENV !== 'development') { await topicPromise } } catch (e) { - console.log(`Skipping topic creation, topic ${process.env.KAFKA_TOPIC} already exists.`) + this.logger.log(`Skipping topic creation, topic ${process.env.KAFKA_TOPIC} already exists.`) } return topic @@ -99,7 +100,7 @@ export class KafkaService { partitionsConsumedConcurrently: 4, }) } catch (e) { - console.log(`Error initiating consumer for groupId ${groupId}.`, e) + this.logger.log(`Error initiating consumer for groupId ${groupId}.`, e) } } } diff --git a/apps/trench/src/webhooks/webhooks.service.ts b/apps/trench/src/webhooks/webhooks.service.ts index 14c7d61..b17c5cf 100644 --- a/apps/trench/src/webhooks/webhooks.service.ts +++ b/apps/trench/src/webhooks/webhooks.service.ts @@ -1,4 +1,4 @@ -import { Injectable, OnModuleInit } from '@nestjs/common' +import { Injectable, Logger, OnModuleInit } from '@nestjs/common' import { KafkaService } from '../services/data/kafka/kafka.service' import { WebhooksDao } from './webhooks.dao' import { DEFAULT_KAFKA_TOPIC } from '../common/constants' @@ -15,6 +15,7 @@ import { shouldProcessEvent } from './webhooks.util' import { Consumer } from 'kafkajs' @Injectable() export class WebhooksService implements OnModuleInit { + private readonly logger = new Logger(WebhooksService.name) constructor( private readonly webhooksDao: WebhooksDao, private readonly kafkaService: KafkaService, @@ -23,18 +24,18 @@ export class WebhooksService implements OnModuleInit { ) {} async onModuleInit() { - console.log('Starting Kafka consumers... this might take a while...') + this.logger.log('Starting Kafka consumers... this might take a while...') const workspaces = await this.workspacesService.getWorkspaces() for (const workspace of workspaces) { const webhooks = await this.webhooksDao.getWebhooks(workspace) for (const webhook of webhooks) { - console.log('Initiating consumer for webhook:', webhook.uuid, webhook.url) + this.logger.log('Initiating consumer for webhook:', webhook.uuid, webhook.url) this.initiateConsumer(webhook, workspace) .then(() => { - console.log(`Consumer for webhook ${webhook.uuid} has been initiated.`) + this.logger.log(`Consumer for webhook ${webhook.uuid} has been initiated.`) }) .catch((e) => { - console.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e) + this.logger.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e.message, e.stack) }) } } @@ -63,7 +64,7 @@ export class WebhooksService implements OnModuleInit { const thisWebhook = webhooks.find((webhook) => webhook.uuid === webhookUUID) if (!thisWebhook) { - console.error( + this.logger.error( `Webhook not found. Skipping processing for ${webhookUUID} and disconnecting consumer.` ) await consumer.stop() @@ -99,7 +100,7 @@ export class WebhooksService implements OnModuleInit { } if (eventsFound.length < numberOfEventsToFind) { - console.error( + this.logger.error( `Error: Not all events found after ${maxRetries} retries for webhook ${webhookUUID}.` ) } @@ -122,10 +123,10 @@ export class WebhooksService implements OnModuleInit { body: JSON.stringify(webhook.flatten ? flatten(payload) : payload), }) if (!response.ok) { - console.error('Error sending webhook:', webhook.url, response.statusText) + this.logger.error('Error sending webhook:', webhook.url, response.statusText) } } catch (error) { - console.error('Error sending webhook:', webhook.url, error.message) + this.logger.error('Error sending webhook:', webhook.url, error.message, error.stack) } }