diff --git a/src/core/metrics.ts b/src/core/metrics.ts index 1a4c896..9aeb569 100644 --- a/src/core/metrics.ts +++ b/src/core/metrics.ts @@ -1,42 +1,12 @@ import { ObservableResult } from '@opentelemetry/api'; import { PrometheusExporter } from '@opentelemetry/exporter-prometheus'; import { logger } from '../utils/logger'; -import { - ExplicitBucketHistogramAggregation, - InstrumentType, - MeterProvider, - View, -} from '@opentelemetry/sdk-metrics-base'; +import { MeterProvider } from '@opentelemetry/sdk-metrics-base'; import { NextFunction, Request, Response } from 'express'; -/** - * Creates {count} buckets of size {increment} starting from {start}. Each bucket stores the count of values within its "size". - * @param start - * @param increment - * @param count - * @returns - */ -const createHistogramBuckets = ( - start: number, - increment: number, - count: number -) => { - return new ExplicitBucketHistogramAggregation( - Array.from(new Array(count), (_, i) => start + i * increment) - ); -}; - enum METRIC_TYPES { - runtime_specs = 'runtime_specs', - endpoint_response_times_histogram = 'endpoint_response_times_histogram', - endpoint_response_status = 'endpoint_response_status', - gpa_fetch_duration = 'gpa_fetch_duration', - last_ws_message_received_ts = 'last_ws_message_received_ts', - account_updates_count = 'account_updates_count', - cache_hit_count = 'cache_hit_count', - cache_miss_count = 'cache_miss_count', - current_system_ts = 'current_system_ts', health_status = 'health_status', + user_pubkey_list_length = 'user_pubkey_list_length', } export enum HEALTH_STATUS { @@ -61,101 +31,44 @@ const exporter = new PrometheusExporter( ); } ); -const meterName = 'dlob-meter'; -const meterProvider = new MeterProvider({ - views: [ - new View({ - instrumentName: METRIC_TYPES.endpoint_response_times_histogram, - instrumentType: InstrumentType.HISTOGRAM, - meterName, - aggregation: createHistogramBuckets(0, 20, 30), - }), - new View({ - instrumentName: METRIC_TYPES.gpa_fetch_duration, - instrumentType: InstrumentType.HISTOGRAM, - meterName, - aggregation: createHistogramBuckets(0, 500, 20), - }), - ], -}); + +const meterName = 'usermap-server-meter'; +const meterProvider = new MeterProvider(); + meterProvider.addMetricReader(exporter); + const meter = meterProvider.getMeter(meterName); -const runtimeSpecsGauge = meter.createObservableGauge( - METRIC_TYPES.runtime_specs, - { - description: 'Runtime sepcification of this program', - } -); +let currentUserPubkeyListLength = 0; -let healthStatus: HEALTH_STATUS = HEALTH_STATUS.Ok; -const healthStatusGauge = meter.createObservableGauge( - METRIC_TYPES.health_status, +const userPubkeyListLengthGauge = meter.createObservableGauge( + METRIC_TYPES.user_pubkey_list_length, { - description: 'Health status of this program', + description: 'Number of user public keys in the list', } ); -healthStatusGauge.addCallback((obs: ObservableResult) => { - obs.observe(healthStatus, {}); + +userPubkeyListLengthGauge.addCallback((obs: ObservableResult) => { + obs.observe(currentUserPubkeyListLength); }); -let lastWsMsgReceivedTs = 0; -const setLastReceivedWsMsgTs = (ts: number) => { - lastWsMsgReceivedTs = ts; +const updateUserPubkeyListLength = (length: number) => { + currentUserPubkeyListLength = length; }; -const lastWsReceivedTsGauge = meter.createObservableGauge( - METRIC_TYPES.last_ws_message_received_ts, - { - description: 'Timestamp of last received websocket message', - } -); -lastWsReceivedTsGauge.addCallback((obs: ObservableResult) => { - obs.observe(lastWsMsgReceivedTs, {}); -}); -const cacheHitCounter = meter.createCounter(METRIC_TYPES.cache_hit_count, { - description: 'Total redis cache hits', -}); +let healthStatus: HEALTH_STATUS = HEALTH_STATUS.Ok; -const accountUpdatesCounter = meter.createCounter( - METRIC_TYPES.account_updates_count, +const healthStatusGauge = meter.createObservableGauge( + METRIC_TYPES.health_status, { - description: 'Total accounts update', + description: 'Current health status of the server', } ); -const currentSystemTsGauge = meter.createObservableGauge( - METRIC_TYPES.current_system_ts, - { - description: 'Timestamp of system at time of metric collection', - } -); -currentSystemTsGauge.addCallback((obs: ObservableResult) => { - obs.observe(Date.now(), {}); +healthStatusGauge.addCallback((obs: ObservableResult) => { + obs.observe(healthStatus); }); -const endpointResponseTimeHistogram = meter.createHistogram( - METRIC_TYPES.endpoint_response_times_histogram, - { - description: 'Duration of endpoint responses', - unit: 'ms', - } -); -const gpaFetchDurationHistogram = meter.createHistogram( - METRIC_TYPES.gpa_fetch_duration, - { - description: 'Duration of GPA fetches', - unit: 'ms', - } -); - -const responseStatusCounter = meter.createCounter( - METRIC_TYPES.endpoint_response_status, - { - description: 'Count of endpoint responses by status code', - } -); - const healthCheckInterval = 2000; let lastHealthCheckSlot = -1; let lastHealthCheckState = true; // true = healthy, false = unhealthy @@ -240,13 +153,4 @@ const handleHealthCheck = (core: Core) => { }; }; -export { - endpointResponseTimeHistogram, - gpaFetchDurationHistogram, - responseStatusCounter, - handleHealthCheck, - setLastReceivedWsMsgTs, - accountUpdatesCounter, - cacheHitCounter, - runtimeSpecsGauge, -}; +export { handleHealthCheck, updateUserPubkeyListLength }; diff --git a/src/core/middleware.ts b/src/core/middleware.ts deleted file mode 100644 index 7705d3c..0000000 --- a/src/core/middleware.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Request, Response } from 'express'; -import responseTime = require('response-time'); -import { - endpointResponseTimeHistogram, - responseStatusCounter, -} from './metrics'; - -export const handleResponseTime = responseTime( - (req: Request, res: Response, time: number) => { - const endpoint = req.path; - - if (endpoint === '/health' || req.url === '/') { - return; - } - - responseStatusCounter.add(1, { - endpoint, - status: res.statusCode, - }); - - const responseTimeMs = time; - endpointResponseTimeHistogram.record(responseTimeMs, { - endpoint, - }); - } -); diff --git a/src/publisher.ts b/src/publisher.ts index 54c45b0..3c6292c 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -16,8 +16,7 @@ import morgan from 'morgan'; import compression from 'compression'; import * as http from 'http'; -import { runtimeSpecsGauge } from './core/metrics'; -import { handleResponseTime } from './core/middleware'; +import { updateUserPubkeyListLength } from './core/metrics'; import { DriftClient, DriftEnv, @@ -28,7 +27,7 @@ import { import { sleep } from './utils/utils'; import { setupEndpoints } from './endpoints'; import { ZSTDDecoder } from 'zstddec'; -import { RedisClient, RedisClientPrefix } from '@drift/common'; +import { RedisClient, RedisClientPrefix, COMMON_UI_UTILS } from '@drift/common'; import { setGlobalDispatcher, Agent } from 'undici'; setGlobalDispatcher( @@ -47,6 +46,8 @@ const REDIS_PASSWORD = process.env.REDIS_PASSWORD; const USE_ELASTICACHE = process.env.ELASTICACHE === 'true' || false; const SYNC_ON_STARTUP = process.env.SYNC_ON_STARTUP; +const SYNC_INTERVAL = parseInt(process.env.SYNC_INTERVAL) || 90_000; +const EXPIRY_MULTIPLIER = 4; const endpoint = process.env.ENDPOINT!; if (!endpoint) { @@ -71,19 +72,6 @@ app.use(cors({ origin: '*' })); app.use(compression()); app.set('trust proxy', 1); app.use(logHttp); -app.use(handleResponseTime); - -// Metrics defined here -const bootTimeMs = Date.now(); -const commitHash = process.env.COMMIT; -runtimeSpecsGauge.addCallback((obs) => { - obs.observe(bootTimeMs, { - commit: commitHash, - driftEnv, - rpcEndpoint: endpoint, - wsEndpoint: wsEndpoint, - }); -}); const server = http.createServer(app); @@ -144,28 +132,34 @@ export class WebsocketCacheProgramAccountSubscriber { const existingData = await this.redisClient.getRaw( keyedAccountInfo.accountId.toString() ); + if (!existingData) { this.lastWriteTs = Date.now(); - await this.redisClient.setRaw( - keyedAccountInfo.accountId.toString(), - `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` - ); - await this.redisClient.rPush( - 'user_pubkeys', - keyedAccountInfo.accountId.toString() - ); + + await this.redisClient + .forceGetClient() + .setex( + keyedAccountInfo.accountId.toString(), + (SYNC_INTERVAL * EXPIRY_MULTIPLIER) / 1000, + `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` + ); return; } + const existingSlot = existingData.split('::')[0]; + if ( incomingSlot >= parseInt(existingSlot) || isNaN(parseInt(existingSlot)) ) { this.lastWriteTs = Date.now(); - await this.redisClient.setRaw( - keyedAccountInfo.accountId.toString(), - `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` - ); + await this.redisClient + .forceGetClient() + .setex( + keyedAccountInfo.accountId.toString(), + (SYNC_INTERVAL * EXPIRY_MULTIPLIER) / 1000, + `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` + ); return; } } @@ -253,7 +247,10 @@ export class WebsocketCacheProgramAccountSubscriber { await this.handleRpcResponse(context, keyedAccountInfo); })() ); + await Promise.all(promises); + + await this.syncPubKeys(programAccountBufferMap); } catch (e) { const err = e as Error; console.error( @@ -266,6 +263,70 @@ export class WebsocketCacheProgramAccountSubscriber { } } + async syncPubKeys( + programAccountBufferMap: Map + ): Promise { + const newKeys = Array.from(programAccountBufferMap.keys()); + const currentKeys = await this.redisClient.lRange('user_pubkeys', 0, -1); + + const keysToAdd = newKeys.filter((key) => !currentKeys.includes(key)); + const keysToRemove = currentKeys.filter((key) => !newKeys.includes(key)); + + const removalBatches = COMMON_UI_UTILS.chunks(keysToRemove, 100); + for (const batch of removalBatches) { + await Promise.all( + batch.map((key) => this.redisClient.lRem('user_pubkeys', 0, key)) + ); + } + + const additionBatches = COMMON_UI_UTILS.chunks(keysToAdd, 100); + for (const batch of additionBatches) { + await this.redisClient.rPush('user_pubkeys', ...batch); + } + + logger.info( + `Synchronized user_pubkeys: Added ${keysToAdd.length}, Removed ${keysToRemove.length}` + ); + } + + async checkSync(): Promise { + if (this.syncLock) { + logger.info('SYNC LOCKED DURING CHECK'); + return; + } + + const storedUserPubkeys = await this.redisClient.lRange( + 'user_pubkeys', + 0, + -1 + ); + + const removedKeys = []; + await Promise.all( + storedUserPubkeys.map(async (pubkey) => { + const exists = await this.redisClient.forceGetClient().exists(pubkey); + if (!exists) { + removedKeys.push(pubkey); + } + }) + ); + + if (removedKeys.length > 0) { + await Promise.all( + removedKeys.map(async (pubkey) => { + this.redisClient.lRem('user_pubkeys', 0, pubkey); + }) + ); + } + + const updatedListLength = await this.redisClient.lLen('user_pubkeys'); + updateUserPubkeyListLength(updatedListLength); + + logger.warn( + `Found ${removedKeys.length} keys to remove from user_pubkeys list` + ); + } + async subscribe(): Promise { await this.decoder.init(); @@ -273,13 +334,18 @@ export class WebsocketCacheProgramAccountSubscriber { return; } - const syncInterval = setInterval( - async () => { - logger.info('Syncing on interval'); - await this.sync(); - }, - parseInt(process.env.SYNC_INTERVAL) || 90_000 - ); + let syncCount = 0; + const syncInterval = setInterval(async () => { + logger.info('Syncing on interval'); + await this.sync(); + + if (syncCount % EXPIRY_MULTIPLIER === 0) { + logger.info('Checking sync on interval'); + await this.checkSync(); + } + + syncCount++; + }, SYNC_INTERVAL); this.syncInterval = syncInterval;