From 5e0d848f431e5daf16fb10d0dabe71f7b280beff Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:55:36 -0400 Subject: [PATCH] feat: basic/apikey credentials check cron job (#2862) ## Describe your changes I decided to piggy-back on the refreshTokens cron job (renamed refreshConnections even though I am not sure with the naming. Recommendations are welcomed) to benefit from logging of errors, etc... and not have another dedicated background jobs that is very similar. A failed check shows in the logs like this, the same way a token refresh error cc @bastienbeurier : ![Screenshot 2024-10-21 at 08 53 34](https://github.com/user-attachments/assets/6f190150-c4c0-46f7-aee9-e84344e03f03) ## Issue ticket number and link https://linear.app/nango/issue/NAN-1404/periodically-test-for-invalid-api-keys-and-basic-creds ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [ ] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is: --- packages/logs/lib/models/helpers.ts | 1 + .../lib/controllers/apiAuth.controller.ts | 7 +- .../server/lib/controllers/auth/postJwt.ts | 4 +- .../server/lib/controllers/auth/postTba.ts | 4 +- packages/server/lib/hooks/hooks.ts | 12 ++-- ...refreshTokens.ts => refreshConnections.ts} | 33 +++++---- packages/server/lib/server.ts | 4 +- packages/shared/lib/index.ts | 2 + .../shared/lib/services/connection.service.ts | 70 ++++++++++++++++++- packages/types/lib/logs/messages.ts | 2 +- packages/utils/lib/telemetry/metrics.ts | 6 +- 11 files changed, 107 insertions(+), 38 deletions(-) rename packages/server/lib/{refreshTokens.ts => refreshConnections.ts} (80%) diff --git a/packages/logs/lib/models/helpers.ts b/packages/logs/lib/models/helpers.ts index 2e77b501963..38e4e98db19 100644 --- a/packages/logs/lib/models/helpers.ts +++ b/packages/logs/lib/models/helpers.ts @@ -126,6 +126,7 @@ export const operationTypeToMessage: Record = { 'auth:create_connection': 'Create connection', 'auth:post_connection': 'Post connection script execution', 'auth:refresh_token': 'Token refresh', + 'auth:connection_test': 'Connection test', 'deploy:custom': 'Deploying custom scripts', 'deploy:prebuilt': 'Deploying pre-built flow', 'proxy:call': 'Proxy call', diff --git a/packages/server/lib/controllers/apiAuth.controller.ts b/packages/server/lib/controllers/apiAuth.controller.ts index d97ec936d59..b1a719193d5 100644 --- a/packages/server/lib/controllers/apiAuth.controller.ts +++ b/packages/server/lib/controllers/apiAuth.controller.ts @@ -1,5 +1,4 @@ import type { Request, Response, NextFunction } from 'express'; -import tracer from 'dd-trace'; import type { ApiKeyCredentials, BasicApiCredentials } from '@nangohq/shared'; import { errorManager, @@ -122,8 +121,7 @@ class ApiAuthController { connectionId, providerConfigKey, environment.id, - connectionConfig, - tracer + connectionConfig ); if (connectionResponse.isErr()) { @@ -296,8 +294,7 @@ class ApiAuthController { connectionId, providerConfigKey, environment.id, - connectionConfig, - tracer + connectionConfig ); if (connectionResponse.isErr()) { diff --git a/packages/server/lib/controllers/auth/postJwt.ts b/packages/server/lib/controllers/auth/postJwt.ts index c9084e54781..2cca6437124 100644 --- a/packages/server/lib/controllers/auth/postJwt.ts +++ b/packages/server/lib/controllers/auth/postJwt.ts @@ -1,5 +1,4 @@ import type { NextFunction } from 'express'; -import tracer from 'dd-trace'; import { z } from 'zod'; import { asyncWrapper } from '../../utils/asyncWrapper.js'; import { zodErrorToHTTP, stringifyError } from '@nangohq/utils'; @@ -157,8 +156,7 @@ export const postPublicJwtAuthorization = asyncWrapper => { await errorNotificationService.auth.create({ type: 'auth', - action: 'token_refresh', + action, connection_id: connection.id!, log_id: logCtx.id, active: true @@ -199,8 +202,7 @@ export const connectionTest = async ( connectionId: string, providerConfigKey: string, environment_id: number, - connection_config: ConnectionConfig, - tracer: Tracer + connection_config: ConnectionConfig ): Promise> => { const providerVerification = provider?.proxy?.verification; diff --git a/packages/server/lib/refreshTokens.ts b/packages/server/lib/refreshConnections.ts similarity index 80% rename from packages/server/lib/refreshTokens.ts rename to packages/server/lib/refreshConnections.ts index 1f9d165bd20..9ff9e19c427 100644 --- a/packages/server/lib/refreshTokens.ts +++ b/packages/server/lib/refreshConnections.ts @@ -3,36 +3,40 @@ import type { Lock } from '@nangohq/shared'; import { errorManager, ErrorSourceEnum, connectionService, locking } from '@nangohq/shared'; import { stringifyError, getLogger, metrics } from '@nangohq/utils'; import { logContextGetter } from '@nangohq/logs'; -import { connectionRefreshFailed as connectionRefreshFailedHook, connectionRefreshSuccess as connectionRefreshSuccessHook } from './hooks/hooks.js'; +import { + connectionRefreshFailed as connectionRefreshFailedHook, + connectionRefreshSuccess as connectionRefreshSuccessHook, + connectionTest as connectionTestHook +} from './hooks/hooks.js'; import tracer from 'dd-trace'; const logger = getLogger('Server'); -const cronName = '[refreshTokens]'; +const cronName = '[refreshConnections]'; const cronMinutes = 10; -export function refreshTokens(): void { +export function refreshConnectionsCron(): void { cron.schedule(`*/${cronMinutes} * * * *`, () => { (async () => { const start = Date.now(); try { await exec(); } catch (err: unknown) { - const e = new Error('failed_to_refresh_tokens', { + const e = new Error('failed_to_refresh_connections', { cause: err instanceof Error ? err.message : String(err) }); errorManager.report(e, { source: ErrorSourceEnum.PLATFORM }, tracer); } finally { - metrics.duration(metrics.Types.REFRESH_TOKENS, Date.now() - start); + metrics.duration(metrics.Types.REFRESH_CONNECTIONS, Date.now() - start); } })().catch((error: unknown) => { - logger.error('Failed to execute refreshTokens cron job'); + logger.error('Failed to execute refreshConnections cron job'); logger.error(error); }); }); } export async function exec(): Promise { - return await tracer.trace>('nango.server.cron.connectionCheck', async (span) => { + return await tracer.trace>('nango.server.cron.refreshConnections', async (span) => { let lock: Lock | undefined; try { logger.info(`${cronName} starting`); @@ -52,7 +56,7 @@ export async function exec(): Promise { const limit = 1000; // eslint-disable-next-line no-constant-condition while (true) { - const staleConnections = await connectionService.getStaleConnections({ days: 1, limit, cursor }); + const staleConnections = await connectionService.getStaleConnections({ days: 0, limit, cursor }); logger.info(`${cronName} found ${staleConnections.length} stale connections`); for (const staleConnection of staleConnections) { if (Date.now() - startTimestamp > ttlMs) { @@ -60,7 +64,7 @@ export async function exec(): Promise { return; } const { connection_id, environment, provider_config_key, account } = staleConnection; - logger.info(`${cronName} refreshing token for connectionId: ${connection_id}, accountId: ${account.id}`); + logger.info(`${cronName} refreshing connection '${connection_id}' for accountId '${account.id}'`); try { const credentialResponse = await connectionService.getConnectionCredentials({ account, @@ -70,16 +74,17 @@ export async function exec(): Promise { logContextGetter, instantRefresh: false, onRefreshSuccess: connectionRefreshSuccessHook, - onRefreshFailed: connectionRefreshFailedHook + onRefreshFailed: connectionRefreshFailedHook, + connectionTestHook }); if (credentialResponse.isOk()) { - metrics.increment(metrics.Types.REFRESH_TOKENS_SUCCESS); + metrics.increment(metrics.Types.REFRESH_CONNECTIONS_SUCCESS); } else { - metrics.increment(metrics.Types.REFRESH_TOKENS_FAILED); + metrics.increment(metrics.Types.REFRESH_CONNECTIONS_FAILED); } } catch (err) { - logger.error(`${cronName} failed to refresh token for connectionId: ${connection_id} ${stringifyError(err)}`); - metrics.increment(metrics.Types.REFRESH_TOKENS_FAILED); + logger.error(`${cronName} failed to refresh connection '${connection_id}' ${stringifyError(err)}`); + metrics.increment(metrics.Types.REFRESH_CONNECTIONS_FAILED); } cursor = staleConnection.cursor; } diff --git a/packages/server/lib/server.ts b/packages/server/lib/server.ts index 6203359d075..ec862be582f 100644 --- a/packages/server/lib/server.ts +++ b/packages/server/lib/server.ts @@ -16,7 +16,7 @@ import { migrate as migrateKeystore } from '@nangohq/keystore'; import publisher from './clients/publisher.client.js'; import { router } from './routes.js'; -import { refreshTokens } from './refreshTokens.js'; +import { refreshConnectionsCron } from './refreshConnections.js'; const { NANGO_MIGRATE_AT_START = 'true' } = process.env; const logger = getLogger('Server'); @@ -58,7 +58,7 @@ if (NANGO_MIGRATE_AT_START === 'true') { } await oAuthSessionService.clearStaleSessions(); -refreshTokens(); +refreshConnectionsCron(); const port = getServerPort(); server.listen(port, () => { diff --git a/packages/shared/lib/index.ts b/packages/shared/lib/index.ts index 854e25b7cde..fc73d0dce2b 100644 --- a/packages/shared/lib/index.ts +++ b/packages/shared/lib/index.ts @@ -33,6 +33,8 @@ export * from './services/invitations.js'; export * from './services/providers.js'; export * as oauth2Client from './clients/oauth2.client.js'; + +export * from './utils/lock/locking.js'; export * from './clients/locking.js'; export * from './utils/lock/locking.js'; diff --git a/packages/shared/lib/services/connection.service.ts b/packages/shared/lib/services/connection.service.ts index a9008c86a07..9dead17c623 100644 --- a/packages/shared/lib/services/connection.service.ts +++ b/packages/shared/lib/services/connection.service.ts @@ -954,7 +954,8 @@ class ConnectionService { logContextGetter, instantRefresh, onRefreshSuccess, - onRefreshFailed + onRefreshFailed, + connectionTestHook = undefined }: { account: DBTeam; environment: DBEnvironment; @@ -970,7 +971,19 @@ class ConnectionService { environment: DBEnvironment; provider: Provider; config: ProviderConfig; + action: 'token_refresh' | 'connection_test'; }) => Promise; + connectionTestHook?: + | (( + providerName: string, + provider: Provider, + credentials: ApiKeyCredentials | BasicApiCredentials | TbaCredentials, + connectionId: string, + providerConfigKey: string, + environment_id: number, + connection_config: ConnectionConfig + ) => Promise>) + | undefined; }): Promise> { if (connectionId === null) { const error = new NangoError('missing_connection'); @@ -1050,7 +1063,8 @@ class ConnectionService { }, environment, provider, - config + config, + action: 'token_refresh' }); } @@ -1071,6 +1085,58 @@ class ConnectionService { } connection.credentials = response.credentials as OAuth2Credentials; + } else if (connection?.credentials?.type === 'BASIC' || connection?.credentials?.type === 'API_KEY' || connection?.credentials?.type === 'TBA') { + if (connectionTestHook) { + const result = await connectionTestHook( + config.provider, + provider, + connection.credentials, + connection.connection_id, + providerConfigKey, + environment.id, + connection.connection_config + ); + if (result.isErr()) { + const logCtx = await logContextGetter.create( + { operation: { type: 'auth', action: 'connection_test' } }, + { + account, + environment, + integration: config ? { id: config.id, name: config.unique_key, provider: config.provider } : undefined, + connection: { id: connection.id, name: connection.connection_id } + } + ); + + await logCtx.error('Failed to verify connection', result.error); + await logCtx.failed(); + await onRefreshFailed({ + connection, + logCtx, + authError: { + type: result.error.type, + description: result.error.message + }, + environment, + provider, + config, + action: 'connection_test' + }); + + // there was an attempt to test the credentials + // so clear it from the queue if it failed + await this.updateLastFetched(connection.id); + + const { credentials, ...connectionWithoutCredentials } = connection; + const errorWithPayload = new NangoError(result.error.type, connectionWithoutCredentials); + return Err(errorWithPayload); + } else { + await onRefreshSuccess({ + connection, + environment, + config + }); + } + } } await this.updateLastFetched(connection.id); diff --git a/packages/types/lib/logs/messages.ts b/packages/types/lib/logs/messages.ts index 4cf9f2f5123..6b3ba24417d 100644 --- a/packages/types/lib/logs/messages.ts +++ b/packages/types/lib/logs/messages.ts @@ -45,7 +45,7 @@ export interface OperationAction { } export interface OperationAuth { type: 'auth'; - action: 'create_connection' | 'refresh_token' | 'post_connection'; + action: 'create_connection' | 'refresh_token' | 'post_connection' | 'connection_test'; } export interface OperationAdmin { type: 'admin'; diff --git a/packages/utils/lib/telemetry/metrics.ts b/packages/utils/lib/telemetry/metrics.ts index 5b64f4e1fc8..d08622df2de 100644 --- a/packages/utils/lib/telemetry/metrics.ts +++ b/packages/utils/lib/telemetry/metrics.ts @@ -30,9 +30,9 @@ export enum Types { PROXY_SUCCESS = 'nango.server.proxy.success', PROXY_FAILURE = 'nango.server.proxy.failure', - REFRESH_TOKENS = 'nango.server.cron.refreshTokens', - REFRESH_TOKENS_FAILED = 'nango.server.cron.refreshTokens.failed', - REFRESH_TOKENS_SUCCESS = 'nango.server.cron.refreshTokens.success', + REFRESH_CONNECTIONS = 'nango.server.cron.refreshConnections', + REFRESH_CONNECTIONS_FAILED = 'nango.server.cron.refreshConnections.failed', + REFRESH_CONNECTIONS_SUCCESS = 'nango.server.cron.refreshConnections.success', RUNNER_SDK = 'nango.runner.sdk', RUNNER_INVALID_ACTION_INPUT = 'nango.runner.invalidActionInput',