Skip to content

Commit

Permalink
feat: add option to flatten webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
christianmat committed Nov 27, 2024
1 parent 48a6e35 commit 3ba964a
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 9 deletions.
2 changes: 1 addition & 1 deletion apps/docs/swagger-spec.json

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions apps/trench/src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export function flatten(data: any): Record<string, any> {
const result: Record<string, any> = {}

function recurse(cur: any, prop: string) {
if (Object(cur) !== cur) {
result[prop] = cur
} else if (Array.isArray(cur)) {
for (let i = 0; i < cur.length; i++) {
recurse(cur[i], prop + '_' + i)
}
if (cur.length === 0) {
result[prop] = []
}
} else {
let isEmpty = true
for (const p in cur) {
isEmpty = false
recurse(cur[p], prop ? prop + '_' + p : p)
}
if (isEmpty && prop) {
result[prop] = {}
}
}
}

recurse(data, '')
return result
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE webhooks
ADD COLUMN IF NOT EXISTS flatten Boolean DEFAULT false;
9 changes: 7 additions & 2 deletions apps/trench/src/webhooks/webhooks.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ClickHouseService } from '../services/data/click-house/click-house.serv
import { Webhook, WebhookDTO } from './webhooks.interface'
import { CACHE_MANAGER } from '@nestjs/cache-manager'
import { Cache } from 'cache-manager'

import { v4 as uuidv4 } from 'uuid'
const CACHE_KEY = 'webhooks'
@Injectable()
export class WebhooksDao {
Expand All @@ -26,6 +26,7 @@ export class WebhooksDao {
createdAt: new Date(row.created_at),
eventTypes: row.event_types,
eventNames: row.event_names,
flatten: row.flatten,
}))
await this.cacheManager.set(CACHE_KEY, resultData)
return resultData
Expand All @@ -36,23 +37,27 @@ export class WebhooksDao {
throw new BadRequestException('URL is required to create a webhook')
}

const uuid = uuidv4()
await this.clickhouse.insert('webhooks', [
{
uuid,
url: webhookDTO.url,
enable_batching: webhookDTO.enableBatching ?? false,
event_types: webhookDTO.eventTypes ?? ['*'],
event_names: webhookDTO.eventNames ?? ['*'],
flatten: webhookDTO.flatten ?? false,
},
])
await this.cacheManager.del(CACHE_KEY)

return {
uuid: 'new-webhook',
uuid,
url: webhookDTO.url,
enableBatching: webhookDTO.enableBatching ?? false,
createdAt: new Date(),
eventTypes: webhookDTO.eventTypes ?? ['*'],
eventNames: webhookDTO.eventNames ?? ['*'],
flatten: webhookDTO.flatten ?? false,
}
}

Expand Down
15 changes: 15 additions & 0 deletions apps/trench/src/webhooks/webhooks.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class Webhook {
example: ['UserSignedUp', 'UserLoggedIn'],
})
eventNames: string[]

@ApiProperty({
description:
"Whether to flatten the event data. This is useful for downstream systems that don't support nested data structures.",
example: true,
})
flatten: boolean
}

export class WebhookDTO {
Expand Down Expand Up @@ -68,6 +75,14 @@ export class WebhookDTO {
required: false,
})
eventNames?: string[]

@ApiProperty({
description:
"Whether to flatten the event data. This is useful for downstream systems that don't support nested data structures. Defaults to `false`.",
example: true,
required: false,
})
flatten?: boolean
}

export class PaginatedWebhookResponse extends PaginatedResponse<Webhook> {
Expand Down
11 changes: 6 additions & 5 deletions apps/trench/src/webhooks/webhooks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { Injectable, OnModuleInit } from '@nestjs/common'
import { KafkaService } from '../services/data/kafka/kafka.service'
import { WebhooksDao } from './webhooks.dao'
import { DEFAULT_KAFKA_TOPIC } from '../common/constants'
import { KafkaEvent, KafkaEventWithUUID } from '../services/data/kafka/kafka.interface'
import { KafkaEvent } from '../services/data/kafka/kafka.interface'
import { Webhook, WebhookDTO } from './webhooks.interface'

import { EventsService } from '../events/events.service'
import { Event } from '../events/events.interface'
import { flatten } from '../common/utils'
@Injectable()
export class WebhooksService implements OnModuleInit {
constructor(
Expand Down Expand Up @@ -82,15 +83,15 @@ export class WebhooksService implements OnModuleInit {

async sendWebhook(webhook: Webhook, events: Event[]) {
try {
const payload = {
data: events,
}
await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
// TODO: Add type here
body: JSON.stringify({
data: events,
}),
body: JSON.stringify(webhook.flatten ? flatten(payload) : payload),
})
} catch (error) {
console.error('Error sending webhook:', error.message)
Expand Down
2 changes: 1 addition & 1 deletion apps/trench/swagger-spec.json

Large diffs are not rendered by default.

0 comments on commit 3ba964a

Please sign in to comment.