Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #36 transition to nestjs/logger #37

Closed
wants to merge 11 commits into from
10 changes: 6 additions & 4 deletions apps/trench/src/appCluster.service.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import * as _cluster from 'cluster'
import * as os from 'os'
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'

const cluster = _cluster as unknown as _cluster.Cluster // typings fix

const numCPUs = os.cpus().length

@Injectable()
export class AppClusterService {
private static readonly logger = new Logger(AppClusterService.name)

static clusterize(callback: Function): void {
if (cluster.isPrimary) {
console.log(`Primary server started on ${process.pid} (using ${numCPUs} processes).`)
this.logger.log(`Primary server started on ${process.pid} (using ${numCPUs} processes).`)
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died. Restarting`)
this.logger.log(`Worker ${worker.process.pid} died. Restarting`)
cluster.fork()
})
} else {
const nodeNumber = cluster.worker.id
console.log(`Cluster server started on ${process.pid}`)
this.logger.log(`Cluster server started on ${process.pid}`)
callback(nodeNumber)
}
}
Expand Down
31 changes: 22 additions & 9 deletions apps/trench/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { AppClusterService } from './appCluster.service'
import { KafkaService } from './services/data/kafka/kafka.service'
import { ClickHouseService } from './services/data/click-house/click-house.service'
import { BootstrapService } from './services/data/bootstrap/bootstrap.service'
import { Logger } from '@nestjs/common'

const logger = new Logger('Bootstrap')

const CORS_OPTIONS = {
origin: '*',
Expand All @@ -25,23 +28,27 @@ const CORS_OPTIONS = {
}

async function bootstrap(nodeNumber: number) {
console.log(`Starting node ${nodeNumber}`)

logger.log(`Starting node ${nodeNumber}`)

let httpsOptions

if (process.env.API_HTTPS === 'true') {
console.log('Using https')
logger.log('Using https')
httpsOptions = {
key: fs.readFileSync('/app/certs/server.key'),
cert: fs.readFileSync('/app/certs/server.crt'),
}
} else {
console.log('Using http')
logger.log('Using http')
}

const fastifyAdapter = new FastifyAdapter({ https: httpsOptions })
const fastifyAdapter = new FastifyAdapter({ https: httpsOptions, logger: true })
fastifyAdapter.enableCors(CORS_OPTIONS)
const app = await NestFactory.create<NestFastifyApplication>(AppModule, fastifyAdapter)
const app = await NestFactory.create<NestFastifyApplication>(
AppModule,
fastifyAdapter,
)

if (process.env.NODE_ENV === 'development') {
const options = new DocumentBuilder().setTitle('trench API').setVersion('1.0').build()
Expand All @@ -58,14 +65,20 @@ async function bootstrap(nodeNumber: number) {
await bootstrapService.bootstrap()
}

console.log('Listening on port', port)
await app.listen(port, '0.0.0.0')
try {
logger.log(`Listening on port ${port}`)
await app.listen(port, '0.0.0.0')
} catch (error) {
logger.error('Failed to start application', error.stack)
throw error
}
}

if (process.env.NODE_ENV !== 'production' && process.env.FORCE_CLUSTER_MODE !== 'true') {
console.log('Running in single instance dev mode')
logger.log('Running in single instance dev mode')
bootstrap(1)
} else {
console.log('Running in cluster mode')
logger.log('Running in cluster mode')
AppClusterService.clusterize(bootstrap)
}

7 changes: 4 additions & 3 deletions apps/trench/src/services/data/bootstrap/bootstrap.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'
import { ClickHouseService } from '../click-house/click-house.service'
import { KafkaService } from '../kafka/kafka.service'
import { DEFAULT_WORKSPACE_ID } from '../../../common/constants'
Expand All @@ -9,6 +9,7 @@ import { Workspace } from '../../../workspaces/workspaces.interface'

@Injectable()
export class BootstrapService {
private readonly logger = new Logger(BootstrapService.name)
constructor(
private readonly clickhouseService: ClickHouseService,
private readonly kafkaService: KafkaService
Expand All @@ -34,12 +35,12 @@ export class BootstrapService {
}

async bootstrapWorkspace(workspace: Workspace) {
console.log(`Creating topics and running migrations for workspace ${workspace.name}`)
this.logger.log(`Creating topics and running migrations for workspace ${workspace.name}`)
const kafkaTopicName = await this.kafkaService.createTopicIfNotExists(
getKafkaTopicFromWorkspace(workspace)
)
await this.clickhouseService.runMigrations(workspace.databaseName, kafkaTopicName)
console.log(
this.logger.log(
`Successfully finished creating topics and running migrations for workspace ${workspace.name}`
)
}
Expand Down
11 changes: 6 additions & 5 deletions apps/trench/src/services/data/click-house/click-house.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'
import { createClient, ClickHouseClient } from '@clickhouse/client'
import * as fs from 'fs'
import * as path from 'path'
Expand All @@ -12,6 +12,7 @@ import {

@Injectable()
export class ClickHouseService {
private readonly logger = new Logger(ClickHouseService.name)
private clientMap: Map<string, ClickHouseClient> = new Map()

getClient(databaseName?: string): ClickHouseClient {
Expand Down Expand Up @@ -83,11 +84,11 @@ export class ClickHouseService {

for (const file of files) {
if (executedFiles.has(file)) {
console.log(`Skipping migration ${file}, already executed `)
this.logger.log(`Skipping migration ${file}, already executed `)
continue
}

console.log(`Executing migration ${file}`)
this.logger.log(`Executing migration ${file}`)

const filePath = path.join(migrationsDir, file)
const query = this.applySubstitutions(fs.readFileSync(filePath, 'utf8'), kafkaTopicName)
Expand All @@ -105,7 +106,7 @@ export class ClickHouseService {
if (String(error).includes('already exists')) {
continue
}
console.error(`Error executing migration ${file} with query ${query}: ${error}`)
this.logger.error(`Error executing migration ${file} with query ${query}: ${error}`)
throw error
}
}
Expand All @@ -116,7 +117,7 @@ export class ClickHouseService {
},
])

console.log(`Migration ${file} executed successfully`)
this.logger.log(`Migration ${file} executed successfully`)
}
}

Expand Down
9 changes: 5 additions & 4 deletions apps/trench/src/services/data/kafka/kafka.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common'
import { Injectable, Logger } from '@nestjs/common'
import { Consumer, Kafka, Producer } from 'kafkajs'
import { KafkaEventWithUUID } from './kafka.interface'
import { DEFAULT_KAFKA_CLIENT_ID, DEFAULT_KAFKA_PARTITIONS } from '../../../common/constants'
Expand All @@ -8,6 +8,7 @@ export class KafkaService {
private hasConnectedToProducer = false
private kafka: Kafka
private producer: Producer
private readonly logger = new Logger(KafkaService.name)

constructor() {
this.kafka = new Kafka({
Expand All @@ -28,13 +29,13 @@ export class KafkaService {
process.env.KAFKA_PARTITIONS
? Number(process.env.KAFKA_PARTITIONS)
: DEFAULT_KAFKA_PARTITIONS
).then(() => console.log(`Created topic ${topic}`))
).then(() => this.logger.log(`Created topic ${topic}`))

if (process.env.NODE_ENV !== 'development') {
await topicPromise
}
} catch (e) {
console.log(`Skipping topic creation, topic ${process.env.KAFKA_TOPIC} already exists.`)
this.logger.log(`Skipping topic creation, topic ${process.env.KAFKA_TOPIC} already exists.`)
}

return topic
Expand Down Expand Up @@ -99,7 +100,7 @@ export class KafkaService {
partitionsConsumedConcurrently: 4,
})
} catch (e) {
console.log(`Error initiating consumer for groupId ${groupId}.`, e)
this.logger.error(`Error initiating consumer for groupId ${groupId}.`, e)
}
}
}
19 changes: 10 additions & 9 deletions apps/trench/src/webhooks/webhooks.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, OnModuleInit } from '@nestjs/common'
import { Injectable, OnModuleInit, Logger } from '@nestjs/common'
import { KafkaService } from '../services/data/kafka/kafka.service'
import { WebhooksDao } from './webhooks.dao'
import { DEFAULT_KAFKA_TOPIC } from '../common/constants'
Expand All @@ -15,6 +15,7 @@ import { shouldProcessEvent } from './webhooks.util'
import { Consumer } from 'kafkajs'
@Injectable()
export class WebhooksService implements OnModuleInit {
private readonly logger = new Logger(WebhooksService.name)
constructor(
private readonly webhooksDao: WebhooksDao,
private readonly kafkaService: KafkaService,
Expand All @@ -23,18 +24,18 @@ export class WebhooksService implements OnModuleInit {
) {}

async onModuleInit() {
console.log('Starting Kafka consumers... this might take a while...')
this.logger.log('Starting Kafka consumers... this might take a while...')
const workspaces = await this.workspacesService.getWorkspaces()
for (const workspace of workspaces) {
const webhooks = await this.webhooksDao.getWebhooks(workspace)
for (const webhook of webhooks) {
console.log('Initiating consumer for webhook:', webhook.uuid, webhook.url)
this.logger.log(`Initiating consumer for webhook: ${webhook.uuid}`)
this.initiateConsumer(webhook, workspace)
.then(() => {
console.log(`Consumer for webhook ${webhook.uuid} has been initiated.`)
this.logger.log(`Consumer for webhook ${webhook.uuid} has been initiated.`)
})
.catch((e) => {
console.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e)
this.logger.error(`Error initiating consumer for webhook ${webhook.uuid}.`, e)
})
}
}
Expand Down Expand Up @@ -63,7 +64,7 @@ export class WebhooksService implements OnModuleInit {
const thisWebhook = webhooks.find((webhook) => webhook.uuid === webhookUUID)

if (!thisWebhook) {
console.error(
this.logger.error(
`Webhook not found. Skipping processing for ${webhookUUID} and disconnecting consumer.`
)
await consumer.stop()
Expand Down Expand Up @@ -99,7 +100,7 @@ export class WebhooksService implements OnModuleInit {
}

if (eventsFound.length < numberOfEventsToFind) {
console.error(
this.logger.error(
`Error: Not all events found after ${maxRetries} retries for webhook ${webhookUUID}.`
)
}
Expand All @@ -122,10 +123,10 @@ export class WebhooksService implements OnModuleInit {
body: JSON.stringify(webhook.flatten ? flatten(payload) : payload),
})
if (!response.ok) {
console.error('Error sending webhook:', webhook.url, response.statusText)
this.logger.error('Error sending webhook:', webhook.url, response.statusText)
}
} catch (error) {
console.error('Error sending webhook:', webhook.url, error.message)
this.logger.error('Error sending webhook:', webhook.url, error.message)
}
}

Expand Down
Loading