Skip to content

Commit

Permalink
feat: basic/apikey credentials check cron job (#2862)
Browse files Browse the repository at this point in the history
## Describe your changes
I decided to piggy-back on the refreshTokens cron job (renamed
refreshConnections even though I am not sure with the naming.
Recommendations are welcomed) to benefit from logging of errors, etc...
and not have another dedicated background jobs that is very similar.

A failed check shows in the logs like this, the same way a token refresh
error cc @bastienbeurier :
![Screenshot 2024-10-21 at 08 53
34](https://github.com/user-attachments/assets/6f190150-c4c0-46f7-aee9-e84344e03f03)

## Issue ticket number and link

https://linear.app/nango/issue/NAN-1404/periodically-test-for-invalid-api-keys-and-basic-creds

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [ ] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
TBonnin authored Oct 21, 2024
1 parent 2022a73 commit 5e0d848
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 38 deletions.
1 change: 1 addition & 0 deletions packages/logs/lib/models/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export const operationTypeToMessage: Record<ConcatOperationList, string> = {
'auth:create_connection': 'Create connection',
'auth:post_connection': 'Post connection script execution',
'auth:refresh_token': 'Token refresh',
'auth:connection_test': 'Connection test',
'deploy:custom': 'Deploying custom scripts',
'deploy:prebuilt': 'Deploying pre-built flow',
'proxy:call': 'Proxy call',
Expand Down
7 changes: 2 additions & 5 deletions packages/server/lib/controllers/apiAuth.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Request, Response, NextFunction } from 'express';
import tracer from 'dd-trace';
import type { ApiKeyCredentials, BasicApiCredentials } from '@nangohq/shared';
import {
errorManager,
Expand Down Expand Up @@ -122,8 +121,7 @@ class ApiAuthController {
connectionId,
providerConfigKey,
environment.id,
connectionConfig,
tracer
connectionConfig
);

if (connectionResponse.isErr()) {
Expand Down Expand Up @@ -296,8 +294,7 @@ class ApiAuthController {
connectionId,
providerConfigKey,
environment.id,
connectionConfig,
tracer
connectionConfig
);

if (connectionResponse.isErr()) {
Expand Down
4 changes: 1 addition & 3 deletions packages/server/lib/controllers/auth/postJwt.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { NextFunction } from 'express';
import tracer from 'dd-trace';
import { z } from 'zod';
import { asyncWrapper } from '../../utils/asyncWrapper.js';
import { zodErrorToHTTP, stringifyError } from '@nangohq/utils';
Expand Down Expand Up @@ -157,8 +156,7 @@ export const postPublicJwtAuthorization = asyncWrapper<PostPublicJwtAuthorizatio
connectionId,
providerConfigKey,
environment.id,
connectionConfig,
tracer
connectionConfig
);

if (connectionResponse.isErr()) {
Expand Down
4 changes: 1 addition & 3 deletions packages/server/lib/controllers/auth/postTba.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { z } from 'zod';
import tracer from 'dd-trace';
import { asyncWrapper } from '../../utils/asyncWrapper.js';
import { zodErrorToHTTP } from '@nangohq/utils';
import { analytics, configService, AnalyticsTypes, getConnectionConfig, connectionService, getProvider } from '@nangohq/shared';
Expand Down Expand Up @@ -149,8 +148,7 @@ export const postPublicTbaAuthorization = asyncWrapper<PostPublicTbaAuthorizatio
connectionId,
providerConfigKey,
environment.id,
connectionConfig,
tracer
connectionConfig
);

if (connectionResponse.isErr()) {
Expand Down
12 changes: 7 additions & 5 deletions packages/server/lib/hooks/hooks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import axios from 'axios';
import type { Span, Tracer } from 'dd-trace';
import type { Span } from 'dd-trace';
import {
CONNECTIONS_WITH_SCRIPTS_CAP_LIMIT,
NangoError,
Expand Down Expand Up @@ -32,6 +32,7 @@ import { logContextGetter } from '@nangohq/logs';
import postConnection from './connection/post-connection.js';
import { externalPostConnection } from './connection/external-post-connection.js';
import { sendAuth as sendAuthWebhook } from '@nangohq/webhooks';
import tracer from 'dd-trace';

const logger = getLogger('hooks');
const orchestrator = getOrchestrator();
Expand Down Expand Up @@ -155,18 +156,20 @@ export const connectionRefreshFailed = async ({
authError,
environment,
provider,
config
config,
action
}: {
connection: Connection;
environment: DBEnvironment;
provider: Provider;
config: IntegrationConfig;
authError: { type: string; description: string };
logCtx: LogContext;
action: 'token_refresh' | 'connection_test';
}): Promise<void> => {
await errorNotificationService.auth.create({
type: 'auth',
action: 'token_refresh',
action,
connection_id: connection.id!,
log_id: logCtx.id,
active: true
Expand Down Expand Up @@ -199,8 +202,7 @@ export const connectionTest = async (
connectionId: string,
providerConfigKey: string,
environment_id: number,
connection_config: ConnectionConfig,
tracer: Tracer
connection_config: ConnectionConfig
): Promise<Result<boolean, NangoError>> => {
const providerVerification = provider?.proxy?.verification;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,40 @@ import type { Lock } from '@nangohq/shared';
import { errorManager, ErrorSourceEnum, connectionService, locking } from '@nangohq/shared';
import { stringifyError, getLogger, metrics } from '@nangohq/utils';
import { logContextGetter } from '@nangohq/logs';
import { connectionRefreshFailed as connectionRefreshFailedHook, connectionRefreshSuccess as connectionRefreshSuccessHook } from './hooks/hooks.js';
import {
connectionRefreshFailed as connectionRefreshFailedHook,
connectionRefreshSuccess as connectionRefreshSuccessHook,
connectionTest as connectionTestHook
} from './hooks/hooks.js';
import tracer from 'dd-trace';

const logger = getLogger('Server');
const cronName = '[refreshTokens]';
const cronName = '[refreshConnections]';
const cronMinutes = 10;

export function refreshTokens(): void {
export function refreshConnectionsCron(): void {
cron.schedule(`*/${cronMinutes} * * * *`, () => {
(async () => {
const start = Date.now();
try {
await exec();
} catch (err: unknown) {
const e = new Error('failed_to_refresh_tokens', {
const e = new Error('failed_to_refresh_connections', {
cause: err instanceof Error ? err.message : String(err)
});
errorManager.report(e, { source: ErrorSourceEnum.PLATFORM }, tracer);
} finally {
metrics.duration(metrics.Types.REFRESH_TOKENS, Date.now() - start);
metrics.duration(metrics.Types.REFRESH_CONNECTIONS, Date.now() - start);
}
})().catch((error: unknown) => {
logger.error('Failed to execute refreshTokens cron job');
logger.error('Failed to execute refreshConnections cron job');
logger.error(error);
});
});
}

export async function exec(): Promise<void> {
return await tracer.trace<Promise<void>>('nango.server.cron.connectionCheck', async (span) => {
return await tracer.trace<Promise<void>>('nango.server.cron.refreshConnections', async (span) => {
let lock: Lock | undefined;
try {
logger.info(`${cronName} starting`);
Expand All @@ -52,15 +56,15 @@ export async function exec(): Promise<void> {
const limit = 1000;
// eslint-disable-next-line no-constant-condition
while (true) {
const staleConnections = await connectionService.getStaleConnections({ days: 1, limit, cursor });
const staleConnections = await connectionService.getStaleConnections({ days: 0, limit, cursor });
logger.info(`${cronName} found ${staleConnections.length} stale connections`);
for (const staleConnection of staleConnections) {
if (Date.now() - startTimestamp > ttlMs) {
logger.info(`${cronName} time limit reached, stopping`);
return;
}
const { connection_id, environment, provider_config_key, account } = staleConnection;
logger.info(`${cronName} refreshing token for connectionId: ${connection_id}, accountId: ${account.id}`);
logger.info(`${cronName} refreshing connection '${connection_id}' for accountId '${account.id}'`);
try {
const credentialResponse = await connectionService.getConnectionCredentials({
account,
Expand All @@ -70,16 +74,17 @@ export async function exec(): Promise<void> {
logContextGetter,
instantRefresh: false,
onRefreshSuccess: connectionRefreshSuccessHook,
onRefreshFailed: connectionRefreshFailedHook
onRefreshFailed: connectionRefreshFailedHook,
connectionTestHook
});
if (credentialResponse.isOk()) {
metrics.increment(metrics.Types.REFRESH_TOKENS_SUCCESS);
metrics.increment(metrics.Types.REFRESH_CONNECTIONS_SUCCESS);
} else {
metrics.increment(metrics.Types.REFRESH_TOKENS_FAILED);
metrics.increment(metrics.Types.REFRESH_CONNECTIONS_FAILED);
}
} catch (err) {
logger.error(`${cronName} failed to refresh token for connectionId: ${connection_id} ${stringifyError(err)}`);
metrics.increment(metrics.Types.REFRESH_TOKENS_FAILED);
logger.error(`${cronName} failed to refresh connection '${connection_id}' ${stringifyError(err)}`);
metrics.increment(metrics.Types.REFRESH_CONNECTIONS_FAILED);
}
cursor = staleConnection.cursor;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/server/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { migrate as migrateKeystore } from '@nangohq/keystore';

import publisher from './clients/publisher.client.js';
import { router } from './routes.js';
import { refreshTokens } from './refreshTokens.js';
import { refreshConnectionsCron } from './refreshConnections.js';

const { NANGO_MIGRATE_AT_START = 'true' } = process.env;
const logger = getLogger('Server');
Expand Down Expand Up @@ -58,7 +58,7 @@ if (NANGO_MIGRATE_AT_START === 'true') {
}

await oAuthSessionService.clearStaleSessions();
refreshTokens();
refreshConnectionsCron();

const port = getServerPort();
server.listen(port, () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/shared/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export * from './services/invitations.js';
export * from './services/providers.js';

export * as oauth2Client from './clients/oauth2.client.js';

export * from './utils/lock/locking.js';
export * from './clients/locking.js';

export * from './utils/lock/locking.js';
Expand Down
70 changes: 68 additions & 2 deletions packages/shared/lib/services/connection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,8 @@ class ConnectionService {
logContextGetter,
instantRefresh,
onRefreshSuccess,
onRefreshFailed
onRefreshFailed,
connectionTestHook = undefined
}: {
account: DBTeam;
environment: DBEnvironment;
Expand All @@ -970,7 +971,19 @@ class ConnectionService {
environment: DBEnvironment;
provider: Provider;
config: ProviderConfig;
action: 'token_refresh' | 'connection_test';
}) => Promise<void>;
connectionTestHook?:
| ((
providerName: string,
provider: Provider,
credentials: ApiKeyCredentials | BasicApiCredentials | TbaCredentials,
connectionId: string,
providerConfigKey: string,
environment_id: number,
connection_config: ConnectionConfig
) => Promise<Result<boolean, NangoError>>)
| undefined;
}): Promise<Result<Connection, NangoError>> {
if (connectionId === null) {
const error = new NangoError('missing_connection');
Expand Down Expand Up @@ -1050,7 +1063,8 @@ class ConnectionService {
},
environment,
provider,
config
config,
action: 'token_refresh'
});
}

Expand All @@ -1071,6 +1085,58 @@ class ConnectionService {
}

connection.credentials = response.credentials as OAuth2Credentials;
} else if (connection?.credentials?.type === 'BASIC' || connection?.credentials?.type === 'API_KEY' || connection?.credentials?.type === 'TBA') {
if (connectionTestHook) {
const result = await connectionTestHook(
config.provider,
provider,
connection.credentials,
connection.connection_id,
providerConfigKey,
environment.id,
connection.connection_config
);
if (result.isErr()) {
const logCtx = await logContextGetter.create(
{ operation: { type: 'auth', action: 'connection_test' } },
{
account,
environment,
integration: config ? { id: config.id, name: config.unique_key, provider: config.provider } : undefined,
connection: { id: connection.id, name: connection.connection_id }
}
);

await logCtx.error('Failed to verify connection', result.error);
await logCtx.failed();
await onRefreshFailed({
connection,
logCtx,
authError: {
type: result.error.type,
description: result.error.message
},
environment,
provider,
config,
action: 'connection_test'
});

// there was an attempt to test the credentials
// so clear it from the queue if it failed
await this.updateLastFetched(connection.id);

const { credentials, ...connectionWithoutCredentials } = connection;
const errorWithPayload = new NangoError(result.error.type, connectionWithoutCredentials);
return Err(errorWithPayload);
} else {
await onRefreshSuccess({
connection,
environment,
config
});
}
}
}

await this.updateLastFetched(connection.id);
Expand Down
2 changes: 1 addition & 1 deletion packages/types/lib/logs/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export interface OperationAction {
}
export interface OperationAuth {
type: 'auth';
action: 'create_connection' | 'refresh_token' | 'post_connection';
action: 'create_connection' | 'refresh_token' | 'post_connection' | 'connection_test';
}
export interface OperationAdmin {
type: 'admin';
Expand Down
6 changes: 3 additions & 3 deletions packages/utils/lib/telemetry/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export enum Types {
PROXY_SUCCESS = 'nango.server.proxy.success',
PROXY_FAILURE = 'nango.server.proxy.failure',

REFRESH_TOKENS = 'nango.server.cron.refreshTokens',
REFRESH_TOKENS_FAILED = 'nango.server.cron.refreshTokens.failed',
REFRESH_TOKENS_SUCCESS = 'nango.server.cron.refreshTokens.success',
REFRESH_CONNECTIONS = 'nango.server.cron.refreshConnections',
REFRESH_CONNECTIONS_FAILED = 'nango.server.cron.refreshConnections.failed',
REFRESH_CONNECTIONS_SUCCESS = 'nango.server.cron.refreshConnections.success',

RUNNER_SDK = 'nango.runner.sdk',
RUNNER_INVALID_ACTION_INPUT = 'nango.runner.invalidActionInput',
Expand Down

0 comments on commit 5e0d848

Please sign in to comment.