Skip to content

Commit

Permalink
feat: add OpenTelemetry export
Browse files Browse the repository at this point in the history
Log operations are sent as OpenTelemetry traces to a otlp backend set by
customers on the environment level
It is using a custom SpanProcessor from opentelemetry-js lib
with one exporter per customers. I assume this feature will only be
available to scale plan customer and therefore the number of exporters
will stay low
Another thing to note is that operation must be started and ended on the
same service, which is the case today but could become a limitation in
the future.
  • Loading branch information
TBonnin committed Oct 24, 2024
1 parent a4ba67b commit a80977f
Show file tree
Hide file tree
Showing 18 changed files with 611 additions and 17 deletions.
236 changes: 226 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
exports.config = { transaction: false };

/**
* @param {import('knex').Knex} knex
*/
exports.up = async function (knex) {
await knex.schema.raw(`ALTER TABLE _nango_environments ADD COLUMN otlp_settings JSONB DEFAULT NULL;`);
await knex.schema.raw(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_environment_otlp_settings" ON "_nango_environments" USING BTREE ("id") WHERE "otlp_settings" IS NOT NULL;`
);
};

/**
* @param {import('knex').Knex} knex
*/
exports.down = async function (knex) {
await knex.schema.raw(`ALTER TABLE _nango_environments DROP COLUMN otlp_settings;`);
await knex.schema.raw(`DROP INDEX CONCURRENTLY IF EXISTS "idx_environment_otlp_settings"`);
};
4 changes: 4 additions & 0 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { getLogger, stringifyError } from '@nangohq/utils';
import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js';
import { envs } from './env.js';
import db from '@nangohq/database';
import { getOtlpRoutes } from '@nangohq/shared';
import { otlp } from '@nangohq/logs';

const logger = getLogger('Jobs');

Expand Down Expand Up @@ -67,6 +69,8 @@ try {
cronAutoIdleDemo();
deleteSyncsData();
timeoutLogsOperations();

otlp.register(getOtlpRoutes);
} catch (err) {
logger.error(stringifyError(err));
process.exit(1);
Expand Down
24 changes: 20 additions & 4 deletions packages/logs/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { getFormattedMessage } from './models/helpers.js';
import { errorToObject, metrics, stringifyError } from '@nangohq/utils';
import { isCli, logger } from './utils.js';
import { envs } from './env.js';
import { OtlpSpan } from './otlp/otlpSpan.js';

interface Options {
dryRun?: boolean;
Expand Down Expand Up @@ -105,16 +106,19 @@ export class LogContextStateless {
*/
export class LogContext extends LogContextStateless {
operation: OperationRow;
span: OtlpSpan;

constructor(data: { parentId: string; operation: OperationRow }, options: Options = { dryRun: false, logToConsole: true }) {
super(data, options);
this.operation = data.operation;
this.span = new OtlpSpan(data.operation);
}

/**
* Add more data to the parentId
*/
async enrichOperation(data: Partial<MessageRow>): Promise<void> {
this.span.enrich(data);
await this.logOrExec(
`enrich(${JSON.stringify(data)})`,
async () => await update({ id: this.id, data: { ...data, createdAt: this.operation.createdAt } })
Expand All @@ -129,19 +133,31 @@ export class LogContext extends LogContextStateless {
}

async failed(): Promise<void> {
await this.logOrExec('failed', async () => await setFailed(this.operation));
await this.logOrExec('failed', async () => {
await setFailed(this.operation);
this.span.end('failed');
});
}

async success(): Promise<void> {
await this.logOrExec('success', async () => await setSuccess(this.operation));
await this.logOrExec('success', async () => {
await setSuccess(this.operation);
this.span.end('success');
});
}

async cancel(): Promise<void> {
await this.logOrExec('cancel', async () => await setCancelled(this.operation));
await this.logOrExec('cancel', async () => {
await setCancelled(this.operation);
this.span.end('cancelled');
});
}

async timeout(): Promise<void> {
await this.logOrExec('timeout', async () => await setTimeouted(this.operation));
await this.logOrExec('timeout', async () => {
await setTimeouted(this.operation);
this.span.end('timeout');
});
}

private async logOrExec(log: string, callback: () => Promise<void>) {
Expand Down
1 change: 1 addition & 0 deletions packages/logs/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export * from './models/helpers.js';
export * from './models/logContextGetter.js';
export * as model from './models/messages.js';
export * as modelOperations from './models/insights.js';
export * from './otlp/otlp.js';
export { envs, defaultOperationExpiration } from './env.js';
Loading

0 comments on commit a80977f

Please sign in to comment.