diff --git a/package-lock.json b/package-lock.json index 95c8f85160d..6af6b8f036d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5615,28 +5615,31 @@ } }, "node_modules/@opentelemetry/api": { - "version": "1.4.1", - "license": "Apache-2.0", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==", "engines": { "node": ">=8.0.0" } }, "node_modules/@opentelemetry/core": { - "version": "1.18.1", - "license": "Apache-2.0", + "version": "1.26.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.26.0.tgz", + "integrity": "sha512-1iKxXXE8415Cdv0yjG3G6hQnB5eVEsJce3QaawX8SjDn0mAS0ZM8fAbZZJD4ajvhC15cePvosSCut404KrIIvQ==", "dependencies": { - "@opentelemetry/semantic-conventions": "1.18.1" + "@opentelemetry/semantic-conventions": "1.27.0" }, "engines": { "node": ">=14" }, "peerDependencies": { - "@opentelemetry/api": ">=1.0.0 <1.8.0" + "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "node_modules/@opentelemetry/semantic-conventions": { - "version": "1.18.1", - "license": "Apache-2.0", + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.27.0.tgz", + "integrity": "sha512-sAay1RrB+ONOem0OZanAR1ZI/k7yDpnOQSQmTMuGImUQb2y8EbSaCJ94FQluM74xoU03vlb2d2U90hZluL6nQg==", "engines": { "node": ">=14" } @@ -16323,6 +16326,14 @@ "node": ">=18" } }, + "node_modules/dd-trace/node_modules/@opentelemetry/api": { + "version": "1.8.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.8.0.tgz", + "integrity": "sha512-I/s6F7yKUDdtMsoBWXJe8Qz40Tui5vsuKCWJEWVL+5q9sSWRzzx6v2KeNsOBEwd94j0eWkpWCH4yB6rZg9Mf0w==", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/dd-trace/node_modules/lru-cache": { "version": "7.18.3", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.18.3.tgz", @@ -27053,9 +27064,10 @@ } }, "node_modules/protobufjs": { - "version": "7.2.5", + "version": "7.4.0", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.4.0.tgz", + "integrity": "sha512-mRUWCc3KUU4w1jU8sGxICXH/gNS94DvI1gxqDvBzhj1JpcsimQkYiOJfwsPUykUI5ZaspFbSgmBLER8IrQ3tqw==", "hasInstallScript": true, - "license": "BSD-3-Clause", "dependencies": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2", @@ -34291,6 +34303,12 @@ "@elastic/elasticsearch": "8.13.1", "@nangohq/kvstore": "file:../kvstore", "@nangohq/utils": "file:../utils", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/exporter-trace-otlp-http": "^0.54.0", + "@opentelemetry/resources": "^1.27.0", + "@opentelemetry/sdk-trace-base": "^1.27.0", + "@opentelemetry/sdk-trace-node": "^1.27.0", + "@opentelemetry/semantic-conventions": "^1.27.0", "zod": "3.23.8" }, "devDependencies": { @@ -34299,6 +34317,204 @@ "vitest": "1.6.0" } }, + "packages/logs/node_modules/@opentelemetry/api-logs": { + "version": "0.54.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api-logs/-/api-logs-0.54.0.tgz", + "integrity": "sha512-9HhEh5GqFrassUndqJsyW7a0PzfyWr2eV2xwzHLIS+wX3125+9HE9FMRAKmJRwxZhgZGwH3HNQQjoMGZqmOeVA==", + "dependencies": { + "@opentelemetry/api": "^1.3.0" + }, + "engines": { + "node": ">=14" + } + }, + "packages/logs/node_modules/@opentelemetry/context-async-hooks": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/context-async-hooks/-/context-async-hooks-1.27.0.tgz", + "integrity": "sha512-CdZ3qmHCwNhFAzjTgHqrDQ44Qxcpz43cVxZRhOs+Ns/79ug+Mr84Bkb626bkJLkA3+BLimA5YAEVRlJC6pFb7g==", + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/core": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.27.0.tgz", + "integrity": "sha512-yQPKnK5e+76XuiqUH/gKyS8wv/7qITd5ln56QkBTf3uggr0VkXOXfcaAuG330UfdYu83wsyoBwqwxigpIG+Jkg==", + "dependencies": { + "@opentelemetry/semantic-conventions": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/exporter-trace-otlp-http": { + "version": "0.54.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/exporter-trace-otlp-http/-/exporter-trace-otlp-http-0.54.0.tgz", + "integrity": "sha512-00X6rtr6Ew59+MM9pPSH7Ww5ScpWKBLiBA49awbPqQuVL/Bp0qp7O1cTxKHgjWdNkhsELzJxAEYwuRnDGrMXyA==", + "dependencies": { + "@opentelemetry/core": "1.27.0", + "@opentelemetry/otlp-exporter-base": "0.54.0", + "@opentelemetry/otlp-transformer": "0.54.0", + "@opentelemetry/resources": "1.27.0", + "@opentelemetry/sdk-trace-base": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "packages/logs/node_modules/@opentelemetry/otlp-exporter-base": { + "version": "0.54.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-exporter-base/-/otlp-exporter-base-0.54.0.tgz", + "integrity": "sha512-g+H7+QleVF/9lz4zhaR9Dt4VwApjqG5WWupy5CTMpWJfHB/nLxBbX73GBZDgdiNfh08nO3rNa6AS7fK8OhgF5g==", + "dependencies": { + "@opentelemetry/core": "1.27.0", + "@opentelemetry/otlp-transformer": "0.54.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "packages/logs/node_modules/@opentelemetry/otlp-transformer": { + "version": "0.54.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/otlp-transformer/-/otlp-transformer-0.54.0.tgz", + "integrity": "sha512-jRexIASQQzdK4AjfNIBfn94itAq4Q8EXR9d3b/OVbhd3kKQKvMr7GkxYDjbeTbY7hHCOLcLfJ3dpYQYGOe8qOQ==", + "dependencies": { + "@opentelemetry/api-logs": "0.54.0", + "@opentelemetry/core": "1.27.0", + "@opentelemetry/resources": "1.27.0", + "@opentelemetry/sdk-logs": "0.54.0", + "@opentelemetry/sdk-metrics": "1.27.0", + "@opentelemetry/sdk-trace-base": "1.27.0", + "protobufjs": "^7.3.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "packages/logs/node_modules/@opentelemetry/propagator-b3": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/propagator-b3/-/propagator-b3-1.27.0.tgz", + "integrity": "sha512-pTsko3gnMioe3FeWcwTQR3omo5C35tYsKKwjgTCTVCgd3EOWL9BZrMfgLBmszrwXABDfUrlAEFN/0W0FfQGynQ==", + "dependencies": { + "@opentelemetry/core": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/propagator-jaeger": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/propagator-jaeger/-/propagator-jaeger-1.27.0.tgz", + "integrity": "sha512-EI1bbK0wn0yIuKlc2Qv2LKBRw6LiUWevrjCF80fn/rlaB+7StAi8Y5s8DBqAYNpY7v1q86+NjU18v7hj2ejU3A==", + "dependencies": { + "@opentelemetry/core": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/resources": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-1.27.0.tgz", + "integrity": "sha512-jOwt2VJ/lUD5BLc+PMNymDrUCpm5PKi1E9oSVYAvz01U/VdndGmrtV3DU1pG4AwlYhJRHbHfOUIlpBeXCPw6QQ==", + "dependencies": { + "@opentelemetry/core": "1.27.0", + "@opentelemetry/semantic-conventions": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/sdk-logs": { + "version": "0.54.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-logs/-/sdk-logs-0.54.0.tgz", + "integrity": "sha512-HeWvOPiWhEw6lWvg+lCIi1WhJnIPbI4/OFZgHq9tKfpwF3LX6/kk3+GR8sGUGAEZfbjPElkkngzvd2s03zbD7Q==", + "dependencies": { + "@opentelemetry/api-logs": "0.54.0", + "@opentelemetry/core": "1.27.0", + "@opentelemetry/resources": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.4.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/sdk-metrics": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-metrics/-/sdk-metrics-1.27.0.tgz", + "integrity": "sha512-JzWgzlutoXCydhHWIbLg+r76m+m3ncqvkCcsswXAQ4gqKS+LOHKhq+t6fx1zNytvLuaOUBur7EvWxECc4jPQKg==", + "dependencies": { + "@opentelemetry/core": "1.27.0", + "@opentelemetry/resources": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/sdk-trace-base": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-base/-/sdk-trace-base-1.27.0.tgz", + "integrity": "sha512-btz6XTQzwsyJjombpeqCX6LhiMQYpzt2pIYNPnw0IPO/3AhT6yjnf8Mnv3ZC2A4eRYOjqrg+bfaXg9XHDRJDWQ==", + "dependencies": { + "@opentelemetry/core": "1.27.0", + "@opentelemetry/resources": "1.27.0", + "@opentelemetry/semantic-conventions": "1.27.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "packages/logs/node_modules/@opentelemetry/sdk-trace-node": { + "version": "1.27.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-node/-/sdk-trace-node-1.27.0.tgz", + "integrity": "sha512-dWZp/dVGdUEfRBjBq2BgNuBlFqHCxyyMc8FsN0NX15X07mxSUO0SZRLyK/fdAVrde8nqFI/FEdMH4rgU9fqJfQ==", + "dependencies": { + "@opentelemetry/context-async-hooks": "1.27.0", + "@opentelemetry/core": "1.27.0", + "@opentelemetry/propagator-b3": "1.27.0", + "@opentelemetry/propagator-jaeger": "1.27.0", + "@opentelemetry/sdk-trace-base": "1.27.0", + "semver": "^7.5.2" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, "packages/logs/node_modules/type-fest": { "version": "4.25.0", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.25.0.tgz", diff --git a/packages/database/lib/migrations/20241024164559_add_otlp_column_to_environments.cjs b/packages/database/lib/migrations/20241024164559_add_otlp_column_to_environments.cjs new file mode 100644 index 00000000000..fefafa05f4c --- /dev/null +++ b/packages/database/lib/migrations/20241024164559_add_otlp_column_to_environments.cjs @@ -0,0 +1,19 @@ +exports.config = { transaction: false }; + +/** + * @param {import('knex').Knex} knex + */ +exports.up = async function (knex) { + await knex.schema.raw(`ALTER TABLE _nango_environments ADD COLUMN otlp_settings JSONB DEFAULT NULL;`); + await knex.schema.raw( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_environment_otlp_settings" ON "_nango_environments" USING BTREE ("id") WHERE "otlp_settings" IS NOT NULL;` + ); +}; + +/** + * @param {import('knex').Knex} knex + */ +exports.down = async function (knex) { + await knex.schema.raw(`ALTER TABLE _nango_environments DROP COLUMN otlp_settings;`); + await knex.schema.raw(`DROP INDEX CONCURRENTLY IF EXISTS "idx_environment_otlp_settings"`); +}; diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index acd894f70a7..7c2f76fa1ab 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -7,6 +7,8 @@ import { getLogger, stringifyError } from '@nangohq/utils'; import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js'; import { envs } from './env.js'; import db from '@nangohq/database'; +import { getOtlpRoutes } from '@nangohq/shared'; +import { otlp } from '@nangohq/logs'; const logger = getLogger('Jobs'); @@ -67,6 +69,8 @@ try { cronAutoIdleDemo(); deleteSyncsData(); timeoutLogsOperations(); + + otlp.register(getOtlpRoutes); } catch (err) { logger.error(stringifyError(err)); process.exit(1); diff --git a/packages/logs/lib/client.ts b/packages/logs/lib/client.ts index f80bcb82d00..850441d9d83 100644 --- a/packages/logs/lib/client.ts +++ b/packages/logs/lib/client.ts @@ -4,6 +4,7 @@ import { getFormattedMessage } from './models/helpers.js'; import { errorToObject, metrics, stringifyError } from '@nangohq/utils'; import { isCli, logger } from './utils.js'; import { envs } from './env.js'; +import { OtlpSpan } from './otlp/otlpSpan.js'; interface Options { dryRun?: boolean; @@ -105,16 +106,19 @@ export class LogContextStateless { */ export class LogContext extends LogContextStateless { operation: OperationRow; + span: OtlpSpan; constructor(data: { parentId: string; operation: OperationRow }, options: Options = { dryRun: false, logToConsole: true }) { super(data, options); this.operation = data.operation; + this.span = new OtlpSpan(data.operation); } /** * Add more data to the parentId */ async enrichOperation(data: Partial): Promise { + this.span.enrich(data); await this.logOrExec( `enrich(${JSON.stringify(data)})`, async () => await update({ id: this.id, data: { ...data, createdAt: this.operation.createdAt } }) @@ -129,19 +133,31 @@ export class LogContext extends LogContextStateless { } async failed(): Promise { - await this.logOrExec('failed', async () => await setFailed(this.operation)); + await this.logOrExec('failed', async () => { + await setFailed(this.operation); + this.span.end('failed'); + }); } async success(): Promise { - await this.logOrExec('success', async () => await setSuccess(this.operation)); + await this.logOrExec('success', async () => { + await setSuccess(this.operation); + this.span.end('success'); + }); } async cancel(): Promise { - await this.logOrExec('cancel', async () => await setCancelled(this.operation)); + await this.logOrExec('cancel', async () => { + await setCancelled(this.operation); + this.span.end('cancelled'); + }); } async timeout(): Promise { - await this.logOrExec('timeout', async () => await setTimeouted(this.operation)); + await this.logOrExec('timeout', async () => { + await setTimeouted(this.operation); + this.span.end('timeout'); + }); } private async logOrExec(log: string, callback: () => Promise) { diff --git a/packages/logs/lib/index.ts b/packages/logs/lib/index.ts index 599f554a1b6..faee961c4da 100644 --- a/packages/logs/lib/index.ts +++ b/packages/logs/lib/index.ts @@ -4,4 +4,5 @@ export * from './models/helpers.js'; export * from './models/logContextGetter.js'; export * as model from './models/messages.js'; export * as modelOperations from './models/insights.js'; +export * from './otlp/otlp.js'; export { envs, defaultOperationExpiration } from './env.js'; diff --git a/packages/logs/lib/otlp/otlp.ts b/packages/logs/lib/otlp/otlp.ts new file mode 100644 index 00000000000..45e33f6d730 --- /dev/null +++ b/packages/logs/lib/otlp/otlp.ts @@ -0,0 +1,60 @@ +import { trace } from '@opentelemetry/api'; +import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; +import { Resource } from '@opentelemetry/resources'; +import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; +import { once, stringifyError } from '@nangohq/utils'; +import { logger } from '../utils.js'; +import { RoutingSpanProcessor } from './otlpSpanProcessor.js'; +import { setInterval } from 'timers'; + +// Enable OpenTelemetry console logging +// import { DiagLogLevel, DiagConsoleLogger, diag } from '@opentelemetry/api'; +// diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +export const otlpRoutingAttributeKey = 'otlp.internal.routingKey'; + +export interface RouteConfig { + routingId: string; + routingEndpoint: string; + routingHeaders: Record; +} + +let routingProcessor: RoutingSpanProcessor | null = null; +process.on('SIGTERM', async () => { + if (routingProcessor) { + await routingProcessor.shutdown(); + } +}); + +function registerRoutes(routes: RouteConfig[]) { + try { + if (routingProcessor) { + routingProcessor.updateRoutes(routes); + } else { + routingProcessor = new RoutingSpanProcessor(routes); + const provider = new NodeTracerProvider({ resource: new Resource({ [ATTR_SERVICE_NAME]: 'nango-otlp' }) }); + provider.addSpanProcessor(routingProcessor); + provider.register(); + } + } catch (err) { + logger.error(`failed_to_register_otlp_routes ${stringifyError(err)}`); + } +} + +async function updateRoutes(getRoutes: () => Promise) { + try { + const routes = await getRoutes(); + registerRoutes(routes); + } catch (err) { + logger.error(`failed_to_update_otlp_routes ${stringifyError(err)}`); + } +} + +export const otlp = { + register: once(async (getRoutes: () => Promise) => { + await updateRoutes(getRoutes); + setInterval(async () => await updateRoutes(getRoutes), 10000); + }), + tracer: trace.getTracer('nango-otlp'), + routingAttributeKey: 'otlp.internal.routingKey' +}; diff --git a/packages/logs/lib/otlp/otlpSpan.ts b/packages/logs/lib/otlp/otlpSpan.ts new file mode 100644 index 00000000000..a3290358786 --- /dev/null +++ b/packages/logs/lib/otlp/otlpSpan.ts @@ -0,0 +1,109 @@ +import type { MessageRow, MessageState, OperationRow } from '@nangohq/types'; +import { SpanStatusCode } from '@opentelemetry/api'; +import type { Span } from '@opentelemetry/api'; +import { otlp, otlpRoutingAttributeKey } from './otlp.js'; +import { envs } from '../env.js'; + +export class OtlpSpan { + private span: Span | null = null; + + constructor(operation: OperationRow) { + if (!envs.NANGO_LOGS_ENABLED || !shouldTrace(operation)) { + return; + } + // if no environmentId, we cannot route the span to the correct exporter so we don't start it + if (operation.environmentId) { + const attributes: Record = { + [otlpRoutingAttributeKey]: `environment:${operation.environmentId}`, + 'nango.operation.id': operation.id, + 'nango.operation.type': operation.operation.type, + 'nango.operation.action': operation.operation.action, + 'nango.operation.message': operation.message, + 'nango.account': operation.accountName + }; + if (operation.environmentName) { + attributes['nango.environment'] = operation.environmentName; + } + if (operation.providerName) { + attributes['nango.provider'] = operation.providerName; + } + if (operation.integrationName) { + attributes['nango.integration'] = operation.integrationName; + } + if (operation.connectionName) { + attributes['nango.connection'] = operation.connectionName; + } + if (operation.syncConfigName) { + attributes['nango.sync'] = operation.syncConfigName; + } + + const spanName = `nango.${operation.operation.type}.${operation.operation.action}`.toLowerCase(); + this.span = otlp.tracer.startSpan(spanName, { attributes }); + } + } + + fail(err: Error): void { + if (this.span) { + this.span.recordException(err); + this.span.setStatus({ code: SpanStatusCode.ERROR }); + } + } + + end(state: MessageState): void { + if (this.span) { + this.span.setAttribute('nango.operation.status', state); + this.span.setStatus({ code: ['success', 'waiting', 'running'].includes(state) ? SpanStatusCode.OK : SpanStatusCode.ERROR }); + this.span.end(); + } + } + + enrich(data: Partial): void { + if (this.span) { + const attrs: Record = {}; + if (data.error) { + attrs['nango.error.message'] = data.error.message; + if (data.error.type) { + attrs['nango.error.type'] = data.error.type; + } + if (data.error.payload) { + attrs['nango.error.payload'] = data.error.payload; + } + } + if (data.environmentName) { + attrs['nango.environment'] = data.environmentName; + } + if (data.providerName) { + attrs['nango.provider'] = data.providerName; + } + if (data.integrationName) { + attrs['nango.integration'] = data.integrationName; + } + if (data.connectionName) { + attrs['nango.connection'] = data.connectionName; + } + if (data.syncConfigName) { + attrs['nango.sync'] = data.syncConfigName; + } + this.span.setAttributes(attrs); + } + } +} + +function shouldTrace(operation: OperationRow): boolean { + if (!operation) { + return false; + } + if (operation.operation.type === 'sync' && operation.operation.action === 'run') { + return true; + } + if (operation.operation.type === 'proxy') { + return true; + } + if (operation.operation.type === 'action') { + return true; + } + if (operation.operation.type === 'webhook') { + return true; + } + return false; +} diff --git a/packages/logs/lib/otlp/otlpSpanProcessor.ts b/packages/logs/lib/otlp/otlpSpanProcessor.ts new file mode 100644 index 00000000000..3420f68c4df --- /dev/null +++ b/packages/logs/lib/otlp/otlpSpanProcessor.ts @@ -0,0 +1,116 @@ +import type { ReadableSpan, Span, SpanProcessor } from '@opentelemetry/sdk-trace-base'; +import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'; +import type { Attributes, Context } from '@opentelemetry/api'; +import { stringToHash, stringifyError } from '@nangohq/utils'; +import { logger } from '../utils.js'; +import { otlpRoutingAttributeKey } from './otlp.js'; +import type { RouteConfig } from './otlp.js'; + +interface BatchSpanProcessorWithRouteHash { + processor: BatchSpanProcessor; + routeHash: number; +} + +export class RoutingSpanProcessor implements SpanProcessor { + private processors = new Map(); + + constructor(routes: RouteConfig[]) { + try { + for (const route of routes) { + const processor = new BatchSpanProcessor( + new OTLPTraceExporter({ + url: `${route.routingEndpoint.replace(/\/$/, '')}/traces`, // trim trailing slash if present + headers: route.routingHeaders + }) + ); + const routeHash = stringToHash(JSON.stringify(route)); + this.processors.set(route.routingId, { processor, routeHash: routeHash }); + } + } catch (err) { + logger.error(`failed_to_created_routing_span_processor ${stringifyError(err)}`); + } + } + + async updateRoutes(routes: RouteConfig[]) { + try { + const toShutdown: BatchSpanProcessor[] = []; + + for (const route of routes) { + const routeHash = stringToHash(JSON.stringify(route)); + const existing = this.processors.get(route.routingId); + if (existing) { + // Skip if the route hasn't changed + if (existing.routeHash === routeHash) { + continue; + } + toShutdown.push(existing.processor); + } + + const traceExporter = new OTLPTraceExporter({ + url: `${route.routingEndpoint.replace(/\/$/, '')}/traces`, + headers: route.routingHeaders + }); + const newProcessor = new BatchSpanProcessor(traceExporter); + this.processors.set(route.routingId, { processor: newProcessor, routeHash: routeHash }); + } + + // Shutdown old processors + await Promise.all(toShutdown.map((p) => p.shutdown())); + } catch (err) { + logger.error(`failed_to_update_routing_span_processor ${stringifyError(err)}`); + } + } + + private getProcessorForSpan(span: { attributes: Attributes }): BatchSpanProcessor | undefined { + const routingId = span.attributes[otlpRoutingAttributeKey]; + if (routingId && typeof routingId === 'string') { + return this.processors.get(routingId)?.processor; + } + return undefined; + } + + onStart(span: Span, context: Context): void { + try { + const processor = this.getProcessorForSpan(span); + if (processor) { + processor.onStart(span, context); + } + } catch (err) { + logger.error(`failed_to_start_span ${stringifyError(err)}`); + } + } + + onEnd(span: ReadableSpan): void { + try { + const processor = this.getProcessorForSpan(span); + if (processor) { + span.spanContext(); + if (span.attributes[otlpRoutingAttributeKey]) { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete span.attributes[otlpRoutingAttributeKey]; + } + processor.onEnd(span); + } + } catch (err) { + logger.error(`failed_to_end_span ${stringifyError(err)}`); + } + } + + async shutdown(): Promise { + try { + const shutdownPromises = Array.from(this.processors.values()).map((p) => p.processor.shutdown()); + await Promise.all(shutdownPromises); + } catch (err) { + logger.error(`failed_to_shutdown_routing_span_processor ${stringifyError(err)}`); + } + } + async forceFlush(): Promise { + try { + const flushPromises = Array.from(this.processors.values()).map((p) => p.processor.forceFlush()); + await Promise.all(flushPromises); + } catch (err) { + logger.error(`failed_to_flush_routing_span_processor ${stringifyError(err)}`); + } + } +} diff --git a/packages/logs/package.json b/packages/logs/package.json index f03132a25ca..c23caf977af 100644 --- a/packages/logs/package.json +++ b/packages/logs/package.json @@ -17,6 +17,12 @@ "@elastic/elasticsearch": "8.13.1", "@nangohq/kvstore": "file:../kvstore", "@nangohq/utils": "file:../utils", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/exporter-trace-otlp-http": "^0.54.0", + "@opentelemetry/resources": "^1.27.0", + "@opentelemetry/sdk-trace-base": "^1.27.0", + "@opentelemetry/sdk-trace-node": "^1.27.0", + "@opentelemetry/semantic-conventions": "^1.27.0", "zod": "3.23.8" }, "devDependencies": { diff --git a/packages/persist/lib/app.ts b/packages/persist/lib/app.ts index 0069e4299fd..67c040d376a 100644 --- a/packages/persist/lib/app.ts +++ b/packages/persist/lib/app.ts @@ -2,6 +2,8 @@ import './tracer.js'; import { getLogger } from '@nangohq/utils'; import { server } from './server.js'; import { envs } from './env.js'; +import { getOtlpRoutes } from '@nangohq/shared'; +import { otlp } from '@nangohq/logs'; const logger = getLogger('Persist'); @@ -10,6 +12,8 @@ try { server.listen(port, () => { logger.info(`🚀 API ready at http://localhost:${port}`); }); + + otlp.register(getOtlpRoutes); } catch (err) { console.error(`Persist API error: ${err}`); process.exit(1); diff --git a/packages/server/lib/server.ts b/packages/server/lib/server.ts index ec862be582f..8e531d925e9 100644 --- a/packages/server/lib/server.ts +++ b/packages/server/lib/server.ts @@ -5,13 +5,13 @@ import express from 'express'; import type { WebSocket } from 'ws'; import { WebSocketServer } from 'ws'; import http from 'node:http'; -import { NANGO_VERSION, getGlobalOAuthCallbackUrl, getServerPort, getWebsocketsPath } from '@nangohq/shared'; +import { NANGO_VERSION, getGlobalOAuthCallbackUrl, getOtlpRoutes, getServerPort, getWebsocketsPath } from '@nangohq/shared'; import { getLogger, requestLoggerMiddleware } from '@nangohq/utils'; import oAuthSessionService from './services/oauth-session.service.js'; import { KnexDatabase } from '@nangohq/database'; import migrate from './utils/migrate.js'; import { migrate as migrateRecords } from '@nangohq/records'; -import { start as migrateLogs } from '@nangohq/logs'; +import { start as migrateLogs, otlp } from '@nangohq/logs'; import { migrate as migrateKeystore } from '@nangohq/keystore'; import publisher from './clients/publisher.client.js'; @@ -59,6 +59,7 @@ if (NANGO_MIGRATE_AT_START === 'true') { await oAuthSessionService.clearStaleSessions(); refreshConnectionsCron(); +otlp.register(getOtlpRoutes); const port = getServerPort(); server.listen(port, () => { diff --git a/packages/shared/lib/index.ts b/packages/shared/lib/index.ts index fc73d0dce2b..cd5921377af 100644 --- a/packages/shared/lib/index.ts +++ b/packages/shared/lib/index.ts @@ -51,6 +51,8 @@ export * from './constants.js'; export * from './sdk/sync.js'; export * from './sdk/dataValidation.js'; +export { getRoutes as getOtlpRoutes } from './otlp/otlp.js'; + export { NANGO_VERSION } from './version.js'; export { diff --git a/packages/shared/lib/otlp/otlp.ts b/packages/shared/lib/otlp/otlp.ts new file mode 100644 index 00000000000..7fd17e3ae9b --- /dev/null +++ b/packages/shared/lib/otlp/otlp.ts @@ -0,0 +1,18 @@ +import type { RouteConfig } from '@nangohq/logs'; +import environmentService from '../services/environment.service.js'; + +export async function getRoutes(): Promise { + const environments = await environmentService.getEnvironmentsWithOtlpSettings(); + return environments.flatMap((env) => { + if (env.otlp_settings?.endpoint && env.otlp_settings?.headers) { + return [ + { + routingId: `environment:${env.id}`, + routingEndpoint: env.otlp_settings?.endpoint, + routingHeaders: env.otlp_settings?.headers || {} + } + ]; + } + return []; + }); +} diff --git a/packages/shared/lib/services/environment.service.integration.test.ts b/packages/shared/lib/services/environment.service.integration.test.ts index 2ca20059b54..062dbc159f6 100644 --- a/packages/shared/lib/services/environment.service.integration.test.ts +++ b/packages/shared/lib/services/environment.service.integration.test.ts @@ -40,7 +40,8 @@ describe('Environment service', () => { updated_at: expect.toBeIsoDate(), uuid: expect.any(String), webhook_url: null, - webhook_url_secondary: null + webhook_url_secondary: null, + otlp_settings: null }); expect(env.secret_key).not.toEqual(env.secret_key_hashed); diff --git a/packages/shared/lib/services/environment.service.ts b/packages/shared/lib/services/environment.service.ts index fea73805eb1..4f3d7c3148d 100644 --- a/packages/shared/lib/services/environment.service.ts +++ b/packages/shared/lib/services/environment.service.ts @@ -302,6 +302,14 @@ class EnvironmentService { return result[0].id; } + async getEnvironmentsWithOtlpSettings(): Promise { + const result = await db.knex.select('*').from(TABLE).whereNotNull('otlp_settings'); + if (result == null) { + return []; + } + return result.map((env) => encryptionManager.decryptEnvironment(env)); + } + async editCallbackUrl(callbackUrl: string, id: number): Promise { return db.knex.from(TABLE).where({ id }).update({ callback_url: callbackUrl }, ['id']); } diff --git a/packages/types/lib/environment/db.ts b/packages/types/lib/environment/db.ts index b284888a6ad..fb3a4ffdaea 100644 --- a/packages/types/lib/environment/db.ts +++ b/packages/types/lib/environment/db.ts @@ -39,6 +39,7 @@ export interface DBEnvironment extends Timestamps { slack_notifications: boolean; webhook_receive_url?: string; + otlp_settings?: { endpoint: string; headers: Record } | null; } export interface ExternalWebhook extends Timestamps { diff --git a/packages/utils/lib/index.ts b/packages/utils/lib/index.ts index 3546f5ace4d..d5df5464c5a 100644 --- a/packages/utils/lib/index.ts +++ b/packages/utils/lib/index.ts @@ -17,3 +17,4 @@ export * from './express/headers.js'; export * from './workflows.js'; export * from './axios.js'; export * from './auth.js'; +export * from './once.js'; diff --git a/packages/utils/lib/once.ts b/packages/utils/lib/once.ts new file mode 100644 index 00000000000..bd09c56177c --- /dev/null +++ b/packages/utils/lib/once.ts @@ -0,0 +1,11 @@ +// Ensures a function is only called once. +export function once(fn: (...args: T) => void): (...args: T) => void { + let called = false; + + return function (...args: T) { + if (!called) { + called = true; + fn(...args); + } + }; +}