Skip to content

Commit

Permalink
Transition to nestjs/logger (#38)
Browse files Browse the repository at this point in the history
* replace console.logs with nests/logger

* refactor: inline CPU count in cluster mode logging
  • Loading branch information
tuminzee authored Dec 11, 2024
1 parent d79758a commit d3c530e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 32 deletions.
9 changes: 5 additions & 4 deletions apps/trench/src/appCluster.service.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import * as _cluster from 'cluster'
import * as os from 'os'
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'

const cluster = _cluster as unknown as _cluster.Cluster // typings fix

const numCPUs = os.cpus().length

@Injectable()
export class AppClusterService {
private static readonly logger = new Logger(AppClusterService.name)
static clusterize(callback: Function): void {
if (cluster.isPrimary) {
console.log(`Primary server started on ${process.pid} (using ${numCPUs} processes).`)
this.logger.log(`Primary server started on ${process.pid} (using ${numCPUs} processes).`)
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died. Restarting`)
this.logger.log(`Worker ${worker.process.pid} died. Restarting`)
cluster.fork()
})
} else {
const nodeNumber = cluster.worker.id
console.log(`Cluster server started on ${process.pid}`)
this.logger.log(`Cluster server started on ${process.pid}`)
callback(nodeNumber)
}
}
Expand Down
19 changes: 12 additions & 7 deletions apps/trench/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import { AppClusterService } from './appCluster.service'
import { KafkaService } from './services/data/kafka/kafka.service'
import { ClickHouseService } from './services/data/click-house/click-house.service'
import { BootstrapService } from './services/data/bootstrap/bootstrap.service'
import { Logger } from '@nestjs/common'
import * as os from 'os'

const logger = new Logger('Main')

const CORS_OPTIONS = {
origin: '*',
Expand All @@ -25,21 +29,21 @@ const CORS_OPTIONS = {
}

async function bootstrap(nodeNumber: number) {
console.log(`Starting node ${nodeNumber}`)
logger.log(`Starting node ${nodeNumber}`)

let httpsOptions

if (process.env.API_HTTPS === 'true') {
console.log('Using https')
logger.log('Using https')
httpsOptions = {
key: fs.readFileSync('/app/certs/server.key'),
cert: fs.readFileSync('/app/certs/server.crt'),
}
} else {
console.log('Using http')
logger.log('Using http')
}

const fastifyAdapter = new FastifyAdapter({ https: httpsOptions })
const fastifyAdapter = new FastifyAdapter({ https: httpsOptions, logger: true })
fastifyAdapter.enableCors(CORS_OPTIONS)
const app = await NestFactory.create<NestFastifyApplication>(AppModule, fastifyAdapter)

Expand All @@ -58,14 +62,15 @@ async function bootstrap(nodeNumber: number) {
await bootstrapService.bootstrap()
}

console.log('Listening on port', port)
logger.log(`Listening on port ${port}`)
await app.listen(port, '0.0.0.0')
}

if (process.env.NODE_ENV !== 'production' && process.env.FORCE_CLUSTER_MODE !== 'true') {
console.log('Running in single instance dev mode')
logger.log('Running in single instance dev mode')
bootstrap(1)
} else {
console.log('Running in cluster mode')
logger.log('Running in cluster mode with ' + os.cpus().length + ' processes')
AppClusterService.clusterize(bootstrap)
}

7 changes: 4 additions & 3 deletions apps/trench/src/services/data/bootstrap/bootstrap.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'
import { ClickHouseService } from '../click-house/click-house.service'
import { KafkaService } from '../kafka/kafka.service'
import { DEFAULT_WORKSPACE_ID } from '../../../common/constants'
Expand All @@ -9,6 +9,7 @@ import { Workspace } from '../../../workspaces/workspaces.interface'

@Injectable()
export class BootstrapService {
private readonly logger = new Logger(BootstrapService.name)
constructor(
private readonly clickhouseService: ClickHouseService,
private readonly kafkaService: KafkaService
Expand All @@ -34,12 +35,12 @@ export class BootstrapService {
}

async bootstrapWorkspace(workspace: Workspace) {
console.log(`Creating topics and running migrations for workspace ${workspace.name}`)
this.logger.log(`Creating topics and running migrations for workspace ${workspace.name}`)
const kafkaTopicName = await this.kafkaService.createTopicIfNotExists(
getKafkaTopicFromWorkspace(workspace)
)
await this.clickhouseService.runMigrations(workspace.databaseName, kafkaTopicName)
console.log(
this.logger.log(
`Successfully finished creating topics and running migrations for workspace ${workspace.name}`
)
}
Expand Down
11 changes: 6 additions & 5 deletions apps/trench/src/services/data/click-house/click-house.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'
import { createClient, ClickHouseClient } from '@clickhouse/client'
import * as fs from 'fs'
import * as path from 'path'
Expand All @@ -12,6 +12,7 @@ import {

@Injectable()
export class ClickHouseService {
private readonly logger = new Logger(ClickHouseService.name)
private clientMap: Map<string, ClickHouseClient> = new Map()

getClient(databaseName?: string): ClickHouseClient {
Expand Down Expand Up @@ -83,11 +84,11 @@ export class ClickHouseService {

for (const file of files) {
if (executedFiles.has(file)) {
console.log(`Skipping migration ${file}, already executed `)
this.logger.log(`Skipping migration ${file}, already executed `)
continue
}

console.log(`Executing migration ${file}`)
this.logger.log(`Executing migration ${file}`)

const filePath = path.join(migrationsDir, file)
const query = this.applySubstitutions(fs.readFileSync(filePath, 'utf8'), kafkaTopicName)
Expand All @@ -105,7 +106,7 @@ export class ClickHouseService {
if (String(error).includes('already exists')) {
continue
}
console.error(`Error executing migration ${file} with query ${query}: ${error}`)
this.logger.error(`Error executing migration ${file} with query ${query}: ${error}`, error.stack)
throw error
}
}
Expand All @@ -116,7 +117,7 @@ export class ClickHouseService {
},
])

console.log(`Migration ${file} executed successfully`)
this.logger.log(`Migration ${file} executed successfully`)
}
}

Expand Down
9 changes: 5 additions & 4 deletions apps/trench/src/services/data/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'
import { Consumer, Kafka, Producer } from 'kafkajs'
import { KafkaEventWithUUID } from './kafka.interface'
import { DEFAULT_KAFKA_CLIENT_ID, DEFAULT_KAFKA_PARTITIONS } from '../../../common/constants'

@Injectable()
export class KafkaService {
private readonly logger = new Logger(KafkaService.name)
private hasConnectedToProducer = false
private kafka: Kafka
private producer: Producer
Expand All @@ -28,13 +29,13 @@ export class KafkaService {
process.env.KAFKA_PARTITIONS
? Number(process.env.KAFKA_PARTITIONS)
: DEFAULT_KAFKA_PARTITIONS
).then(() => console.log(`Created topic ${topic}`))
).then(() => this.logger.log(`Created topic ${topic}`))

if (process.env.NODE_ENV !== 'development') {
await topicPromise
}
} catch (e) {
console.log(`Skipping topic creation, topic ${process.env.KAFKA_TOPIC} already exists.`)
this.logger.log(`Skipping topic creation, topic ${process.env.KAFKA_TOPIC} already exists.`)
}

return topic
Expand Down Expand Up @@ -99,7 +100,7 @@ export class KafkaService {
partitionsConsumedConcurrently: 4,
})
} catch (e) {
console.log(`Error initiating consumer for groupId ${groupId}.`, e)
this.logger.log(`Error initiating consumer for groupId ${groupId}.`, e)
}
}
}
19 changes: 10 additions & 9 deletions apps/trench/src/webhooks/webhooks.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, OnModuleInit } from '@nestjs/common'
import { Injectable, Logger, OnModuleInit } from '@nestjs/common'
import { KafkaService } from '../services/data/kafka/kafka.service'
import { WebhooksDao } from './webhooks.dao'
import { DEFAULT_KAFKA_TOPIC } from '../common/constants'
Expand All @@ -15,6 +15,7 @@ import { shouldProcessEvent } from './webhooks.util'
import { Consumer } from 'kafkajs'
@Injectable()
export class WebhooksService implements OnModuleInit {
private readonly logger = new Logger(WebhooksService.name)
constructor(
private readonly webhooksDao: WebhooksDao,
private readonly kafkaService: KafkaService,
Expand All @@ -23,18 +24,18 @@ export class WebhooksService implements OnModuleInit {
) {}

async onModuleInit() {
console.log('Starting Kafka consumers... this might take a while...')
this.logger.log('Starting Kafka consumers... this might take a while...')
const workspaces = await this.workspacesService.getWorkspaces()
for (const workspace of workspaces) {
const webhooks = await this.webhooksDao.getWebhooks(workspace)
for (const webhook of webhooks) {
console.log('Initiating consumer for webhook:', webhook.uuid, webhook.url)
this.logger.log('Initiating consumer for webhook:', webhook.uuid, webhook.url)
this.initiateConsumer(webhook, workspace)
.then(() => {
console.log(`Consumer for webhook ${webhook.uuid} has been initiated.`)
this.logger.log(`Consumer for webhook ${webhook.uuid} has been initiated.`)
})
.catch((e) => {
console.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e)
this.logger.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e.message, e.stack)
})
}
}
Expand Down Expand Up @@ -63,7 +64,7 @@ export class WebhooksService implements OnModuleInit {
const thisWebhook = webhooks.find((webhook) => webhook.uuid === webhookUUID)

if (!thisWebhook) {
console.error(
this.logger.error(
`Webhook not found. Skipping processing for ${webhookUUID} and disconnecting consumer.`
)
await consumer.stop()
Expand Down Expand Up @@ -99,7 +100,7 @@ export class WebhooksService implements OnModuleInit {
}

if (eventsFound.length < numberOfEventsToFind) {
console.error(
this.logger.error(
`Error: Not all events found after ${maxRetries} retries for webhook ${webhookUUID}.`
)
}
Expand All @@ -122,10 +123,10 @@ export class WebhooksService implements OnModuleInit {
body: JSON.stringify(webhook.flatten ? flatten(payload) : payload),
})
if (!response.ok) {
console.error('Error sending webhook:', webhook.url, response.statusText)
this.logger.error('Error sending webhook:', webhook.url, response.statusText)
}
} catch (error) {
console.error('Error sending webhook:', webhook.url, error.message)
this.logger.error('Error sending webhook:', webhook.url, error.message, error.stack)
}
}

Expand Down

0 comments on commit d3c530e

Please sign in to comment.