From af2bf4c7b0beff6a786cecaa3158d1605f635f4d Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Wed, 2 Oct 2024 10:50:16 +1000 Subject: [PATCH 01/10] chore: expire keys to remove pruner --- src/publisher.ts | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index 54c45b0..0fc1044 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -47,6 +47,8 @@ const REDIS_PASSWORD = process.env.REDIS_PASSWORD; const USE_ELASTICACHE = process.env.ELASTICACHE === 'true' || false; const SYNC_ON_STARTUP = process.env.SYNC_ON_STARTUP; +const SYNC_INTERVAL = parseInt(process.env.SYNC_INTERVAL) || 90_000 +const EXPIRY_MULTIPLIER = 4; const endpoint = process.env.ENDPOINT!; if (!endpoint) { @@ -144,27 +146,29 @@ export class WebsocketCacheProgramAccountSubscriber { const existingData = await this.redisClient.getRaw( keyedAccountInfo.accountId.toString() ); + if (!existingData) { this.lastWriteTs = Date.now(); - await this.redisClient.setRaw( + + await this.redisClient.forceGetClient().setex( keyedAccountInfo.accountId.toString(), - `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` - ); - await this.redisClient.rPush( - 'user_pubkeys', - keyedAccountInfo.accountId.toString() + SYNC_INTERVAL * EXPIRY_MULTIPLIER / 1000, + `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}`, ); return; } + const existingSlot = existingData.split('::')[0]; + if ( incomingSlot >= parseInt(existingSlot) || isNaN(parseInt(existingSlot)) ) { this.lastWriteTs = Date.now(); - await this.redisClient.setRaw( + await this.redisClient.forceGetClient().setex( keyedAccountInfo.accountId.toString(), - `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` + SYNC_INTERVAL * EXPIRY_MULTIPLIER / 1000, + `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}`, ); return; } @@ -236,7 +240,7 @@ export class WebsocketCacheProgramAccountSubscriber { await Promise.all(decodingPromises); - logger.info(`${programAccountBufferMap.size} users to sync`); + logger.info(`${programAccountBufferMap.size} users to sync`); const promises = Array.from(programAccountBufferMap.entries()).map( ([key, buffer]) => @@ -253,7 +257,14 @@ export class WebsocketCacheProgramAccountSubscriber { await this.handleRpcResponse(context, keyedAccountInfo); })() ); + await Promise.all(promises); + + await this.redisClient.lTrim('user_pubkeys', -1, 0); + await this.redisClient.forceGetClient().rpush( + 'user_pubkeys', + ...Array.from(programAccountBufferMap.keys()) + ); } catch (e) { const err = e as Error; console.error( @@ -278,7 +289,7 @@ export class WebsocketCacheProgramAccountSubscriber { logger.info('Syncing on interval'); await this.sync(); }, - parseInt(process.env.SYNC_INTERVAL) || 90_000 + SYNC_INTERVAL ); this.syncInterval = syncInterval; From 547e926ee8f93581a4babaf9f286b224ad3b1871 Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Thu, 3 Oct 2024 11:32:28 +1000 Subject: [PATCH 02/10] chore: add checks --- src/core/metrics.ts | 24 ++++++++++- src/publisher.ts | 101 ++++++++++++++++++++++++++++++++------------ 2 files changed, 97 insertions(+), 28 deletions(-) diff --git a/src/core/metrics.ts b/src/core/metrics.ts index 1a4c896..89bd3c6 100644 --- a/src/core/metrics.ts +++ b/src/core/metrics.ts @@ -37,6 +37,7 @@ enum METRIC_TYPES { cache_miss_count = 'cache_miss_count', current_system_ts = 'current_system_ts', health_status = 'health_status', + user_pubkey_list_length = 'user_pubkey_list_length', } export enum HEALTH_STATUS { @@ -61,7 +62,8 @@ const exporter = new PrometheusExporter( ); } ); -const meterName = 'dlob-meter'; +const meterName = 'usermap-server-meter'; + const meterProvider = new MeterProvider({ views: [ new View({ @@ -78,9 +80,28 @@ const meterProvider = new MeterProvider({ }), ], }); + meterProvider.addMetricReader(exporter); const meter = meterProvider.getMeter(meterName); +let currentUserPubkeyListLength = 0; + +const userPubkeyListLengthGauge = meter.createObservableGauge( + METRIC_TYPES.user_pubkey_list_length, + { + description: 'Number of user public keys in the list', + } +); + +userPubkeyListLengthGauge.addCallback((obs: ObservableResult) => { + obs.observe(currentUserPubkeyListLength); +}); + +// Update function now takes a value parameter +const updateUserPubkeyListLength = (length: number) => { + currentUserPubkeyListLength = length; +}; + const runtimeSpecsGauge = meter.createObservableGauge( METRIC_TYPES.runtime_specs, { @@ -249,4 +270,5 @@ export { accountUpdatesCounter, cacheHitCounter, runtimeSpecsGauge, + updateUserPubkeyListLength }; diff --git a/src/publisher.ts b/src/publisher.ts index 0fc1044..4809315 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -16,7 +16,7 @@ import morgan from 'morgan'; import compression from 'compression'; import * as http from 'http'; -import { runtimeSpecsGauge } from './core/metrics'; +import { runtimeSpecsGauge, updateUserPubkeyListLength } from './core/metrics'; import { handleResponseTime } from './core/middleware'; import { DriftClient, @@ -47,7 +47,7 @@ const REDIS_PASSWORD = process.env.REDIS_PASSWORD; const USE_ELASTICACHE = process.env.ELASTICACHE === 'true' || false; const SYNC_ON_STARTUP = process.env.SYNC_ON_STARTUP; -const SYNC_INTERVAL = parseInt(process.env.SYNC_INTERVAL) || 90_000 +const SYNC_INTERVAL = parseInt(process.env.SYNC_INTERVAL) || 90_000; const EXPIRY_MULTIPLIER = 4; const endpoint = process.env.ENDPOINT!; @@ -146,30 +146,40 @@ export class WebsocketCacheProgramAccountSubscriber { const existingData = await this.redisClient.getRaw( keyedAccountInfo.accountId.toString() ); - + if (!existingData) { this.lastWriteTs = Date.now(); - - await this.redisClient.forceGetClient().setex( - keyedAccountInfo.accountId.toString(), - SYNC_INTERVAL * EXPIRY_MULTIPLIER / 1000, - `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}`, + + await this.redisClient + .forceGetClient() + .setex( + keyedAccountInfo.accountId.toString(), + (SYNC_INTERVAL * EXPIRY_MULTIPLIER) / 1000, + `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` + ); + + await this.redisClient.rPush( + 'user_pubkeys', + keyedAccountInfo.accountId.toString() ); + return; } - + const existingSlot = existingData.split('::')[0]; - + if ( incomingSlot >= parseInt(existingSlot) || isNaN(parseInt(existingSlot)) ) { this.lastWriteTs = Date.now(); - await this.redisClient.forceGetClient().setex( - keyedAccountInfo.accountId.toString(), - SYNC_INTERVAL * EXPIRY_MULTIPLIER / 1000, - `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}`, - ); + await this.redisClient + .forceGetClient() + .setex( + keyedAccountInfo.accountId.toString(), + (SYNC_INTERVAL * EXPIRY_MULTIPLIER) / 1000, + `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` + ); return; } } @@ -240,7 +250,7 @@ export class WebsocketCacheProgramAccountSubscriber { await Promise.all(decodingPromises); - logger.info(`${programAccountBufferMap.size} users to sync`); + logger.info(`${programAccountBufferMap.size} users to sync`); const promises = Array.from(programAccountBufferMap.entries()).map( ([key, buffer]) => @@ -261,10 +271,9 @@ export class WebsocketCacheProgramAccountSubscriber { await Promise.all(promises); await this.redisClient.lTrim('user_pubkeys', -1, 0); - await this.redisClient.forceGetClient().rpush( - 'user_pubkeys', - ...Array.from(programAccountBufferMap.keys()) - ); + await this.redisClient + .forceGetClient() + .rpush('user_pubkeys', ...Array.from(programAccountBufferMap.keys())); } catch (e) { const err = e as Error; console.error( @@ -277,6 +286,39 @@ export class WebsocketCacheProgramAccountSubscriber { } } + async checkSync(): Promise { + const storedUserPubkeys = await this.redisClient.lRange( + 'user_pubkeys', + 0, + -1 + ); + + const removedKeys = []; + await Promise.all( + storedUserPubkeys.map(async (pubkey) => { + const exists = await this.redisClient.forceGetClient().exists(pubkey); + if (!exists) { + removedKeys.push(pubkey); + } + }) + ); + + if (removedKeys.length > 0) { + await Promise.all( + removedKeys.map(async (pubkey) => { + this.redisClient.lRem('user_pubkeys', 0, pubkey); + }) + ); + } + + const updatedListLength = await this.redisClient.lLen('user_pubkeys'); + updateUserPubkeyListLength(updatedListLength); + + logger.warn( + `Found ${removedKeys.length} keys to remove from user_pubkeys list` + ); + } + async subscribe(): Promise { await this.decoder.init(); @@ -284,13 +326,18 @@ export class WebsocketCacheProgramAccountSubscriber { return; } - const syncInterval = setInterval( - async () => { - logger.info('Syncing on interval'); - await this.sync(); - }, - SYNC_INTERVAL - ); + let syncCount = 0; + const syncInterval = setInterval(async () => { + logger.info('Syncing on interval'); + await this.sync(); + + if (syncCount % EXPIRY_MULTIPLIER === 0) { + logger.info('Checking sync on interval'); + await this.checkSync(); + } + + syncCount++; + }, SYNC_INTERVAL); this.syncInterval = syncInterval; From f39b90de9a78509b25ea9d8e1411d0159cfa4814 Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Thu, 3 Oct 2024 13:35:52 +1000 Subject: [PATCH 03/10] chore: attempt reducing batch --- src/publisher.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index 4809315..13a3021 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -28,7 +28,7 @@ import { import { sleep } from './utils/utils'; import { setupEndpoints } from './endpoints'; import { ZSTDDecoder } from 'zstddec'; -import { RedisClient, RedisClientPrefix } from '@drift/common'; +import { RedisClient, RedisClientPrefix, COMMON_UI_UTILS } from '@drift/common'; import { setGlobalDispatcher, Agent } from 'undici'; setGlobalDispatcher( @@ -271,9 +271,15 @@ export class WebsocketCacheProgramAccountSubscriber { await Promise.all(promises); await this.redisClient.lTrim('user_pubkeys', -1, 0); - await this.redisClient - .forceGetClient() - .rpush('user_pubkeys', ...Array.from(programAccountBufferMap.keys())); + + const batches = COMMON_UI_UTILS.chunks(Array.from(programAccountBufferMap.keys()), 1000) + + for (const batch of batches) { + await this.redisClient.rPush( + 'user_pubkeys', + ...batch + ); + } } catch (e) { const err = e as Error; console.error( @@ -287,6 +293,11 @@ export class WebsocketCacheProgramAccountSubscriber { } async checkSync(): Promise { + if (this.syncLock) { + logger.info('SYNC LOCKED DURING CHECK'); + return; + } + const storedUserPubkeys = await this.redisClient.lRange( 'user_pubkeys', 0, From 99076c1e583e128f76f9b18e8f4caae5f48e186c Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Thu, 3 Oct 2024 14:50:23 +1000 Subject: [PATCH 04/10] chore: remove unused metrics --- src/core/metrics.ts | 148 +++++--------------------------------------- src/publisher.ts | 14 +---- 2 files changed, 16 insertions(+), 146 deletions(-) diff --git a/src/core/metrics.ts b/src/core/metrics.ts index 89bd3c6..9aeb569 100644 --- a/src/core/metrics.ts +++ b/src/core/metrics.ts @@ -1,41 +1,10 @@ import { ObservableResult } from '@opentelemetry/api'; import { PrometheusExporter } from '@opentelemetry/exporter-prometheus'; import { logger } from '../utils/logger'; -import { - ExplicitBucketHistogramAggregation, - InstrumentType, - MeterProvider, - View, -} from '@opentelemetry/sdk-metrics-base'; +import { MeterProvider } from '@opentelemetry/sdk-metrics-base'; import { NextFunction, Request, Response } from 'express'; -/** - * Creates {count} buckets of size {increment} starting from {start}. Each bucket stores the count of values within its "size". - * @param start - * @param increment - * @param count - * @returns - */ -const createHistogramBuckets = ( - start: number, - increment: number, - count: number -) => { - return new ExplicitBucketHistogramAggregation( - Array.from(new Array(count), (_, i) => start + i * increment) - ); -}; - enum METRIC_TYPES { - runtime_specs = 'runtime_specs', - endpoint_response_times_histogram = 'endpoint_response_times_histogram', - endpoint_response_status = 'endpoint_response_status', - gpa_fetch_duration = 'gpa_fetch_duration', - last_ws_message_received_ts = 'last_ws_message_received_ts', - account_updates_count = 'account_updates_count', - cache_hit_count = 'cache_hit_count', - cache_miss_count = 'cache_miss_count', - current_system_ts = 'current_system_ts', health_status = 'health_status', user_pubkey_list_length = 'user_pubkey_list_length', } @@ -62,121 +31,44 @@ const exporter = new PrometheusExporter( ); } ); -const meterName = 'usermap-server-meter'; -const meterProvider = new MeterProvider({ - views: [ - new View({ - instrumentName: METRIC_TYPES.endpoint_response_times_histogram, - instrumentType: InstrumentType.HISTOGRAM, - meterName, - aggregation: createHistogramBuckets(0, 20, 30), - }), - new View({ - instrumentName: METRIC_TYPES.gpa_fetch_duration, - instrumentType: InstrumentType.HISTOGRAM, - meterName, - aggregation: createHistogramBuckets(0, 500, 20), - }), - ], -}); +const meterName = 'usermap-server-meter'; +const meterProvider = new MeterProvider(); meterProvider.addMetricReader(exporter); + const meter = meterProvider.getMeter(meterName); let currentUserPubkeyListLength = 0; const userPubkeyListLengthGauge = meter.createObservableGauge( - METRIC_TYPES.user_pubkey_list_length, - { - description: 'Number of user public keys in the list', - } + METRIC_TYPES.user_pubkey_list_length, + { + description: 'Number of user public keys in the list', + } ); userPubkeyListLengthGauge.addCallback((obs: ObservableResult) => { - obs.observe(currentUserPubkeyListLength); + obs.observe(currentUserPubkeyListLength); }); -// Update function now takes a value parameter const updateUserPubkeyListLength = (length: number) => { - currentUserPubkeyListLength = length; + currentUserPubkeyListLength = length; }; -const runtimeSpecsGauge = meter.createObservableGauge( - METRIC_TYPES.runtime_specs, - { - description: 'Runtime sepcification of this program', - } -); - let healthStatus: HEALTH_STATUS = HEALTH_STATUS.Ok; + const healthStatusGauge = meter.createObservableGauge( METRIC_TYPES.health_status, { - description: 'Health status of this program', + description: 'Current health status of the server', } ); -healthStatusGauge.addCallback((obs: ObservableResult) => { - obs.observe(healthStatus, {}); -}); - -let lastWsMsgReceivedTs = 0; -const setLastReceivedWsMsgTs = (ts: number) => { - lastWsMsgReceivedTs = ts; -}; -const lastWsReceivedTsGauge = meter.createObservableGauge( - METRIC_TYPES.last_ws_message_received_ts, - { - description: 'Timestamp of last received websocket message', - } -); -lastWsReceivedTsGauge.addCallback((obs: ObservableResult) => { - obs.observe(lastWsMsgReceivedTs, {}); -}); -const cacheHitCounter = meter.createCounter(METRIC_TYPES.cache_hit_count, { - description: 'Total redis cache hits', -}); - -const accountUpdatesCounter = meter.createCounter( - METRIC_TYPES.account_updates_count, - { - description: 'Total accounts update', - } -); - -const currentSystemTsGauge = meter.createObservableGauge( - METRIC_TYPES.current_system_ts, - { - description: 'Timestamp of system at time of metric collection', - } -); -currentSystemTsGauge.addCallback((obs: ObservableResult) => { - obs.observe(Date.now(), {}); +healthStatusGauge.addCallback((obs: ObservableResult) => { + obs.observe(healthStatus); }); -const endpointResponseTimeHistogram = meter.createHistogram( - METRIC_TYPES.endpoint_response_times_histogram, - { - description: 'Duration of endpoint responses', - unit: 'ms', - } -); -const gpaFetchDurationHistogram = meter.createHistogram( - METRIC_TYPES.gpa_fetch_duration, - { - description: 'Duration of GPA fetches', - unit: 'ms', - } -); - -const responseStatusCounter = meter.createCounter( - METRIC_TYPES.endpoint_response_status, - { - description: 'Count of endpoint responses by status code', - } -); - const healthCheckInterval = 2000; let lastHealthCheckSlot = -1; let lastHealthCheckState = true; // true = healthy, false = unhealthy @@ -261,14 +153,4 @@ const handleHealthCheck = (core: Core) => { }; }; -export { - endpointResponseTimeHistogram, - gpaFetchDurationHistogram, - responseStatusCounter, - handleHealthCheck, - setLastReceivedWsMsgTs, - accountUpdatesCounter, - cacheHitCounter, - runtimeSpecsGauge, - updateUserPubkeyListLength -}; +export { handleHealthCheck, updateUserPubkeyListLength }; diff --git a/src/publisher.ts b/src/publisher.ts index 13a3021..a77d7e2 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -16,7 +16,7 @@ import morgan from 'morgan'; import compression from 'compression'; import * as http from 'http'; -import { runtimeSpecsGauge, updateUserPubkeyListLength } from './core/metrics'; +import { updateUserPubkeyListLength } from './core/metrics'; import { handleResponseTime } from './core/middleware'; import { DriftClient, @@ -75,18 +75,6 @@ app.set('trust proxy', 1); app.use(logHttp); app.use(handleResponseTime); -// Metrics defined here -const bootTimeMs = Date.now(); -const commitHash = process.env.COMMIT; -runtimeSpecsGauge.addCallback((obs) => { - obs.observe(bootTimeMs, { - commit: commitHash, - driftEnv, - rpcEndpoint: endpoint, - wsEndpoint: wsEndpoint, - }); -}); - const server = http.createServer(app); const httpPort = parseInt(process.env.PORT || '5001'); From c0d175e27f3b52bf4cf3bba6adf421dbe12e7c80 Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Thu, 3 Oct 2024 15:18:34 +1000 Subject: [PATCH 05/10] chore: remove unused stuff --- src/core/middleware.ts | 26 -------------------------- src/publisher.ts | 2 -- 2 files changed, 28 deletions(-) delete mode 100644 src/core/middleware.ts diff --git a/src/core/middleware.ts b/src/core/middleware.ts deleted file mode 100644 index 7705d3c..0000000 --- a/src/core/middleware.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Request, Response } from 'express'; -import responseTime = require('response-time'); -import { - endpointResponseTimeHistogram, - responseStatusCounter, -} from './metrics'; - -export const handleResponseTime = responseTime( - (req: Request, res: Response, time: number) => { - const endpoint = req.path; - - if (endpoint === '/health' || req.url === '/') { - return; - } - - responseStatusCounter.add(1, { - endpoint, - status: res.statusCode, - }); - - const responseTimeMs = time; - endpointResponseTimeHistogram.record(responseTimeMs, { - endpoint, - }); - } -); diff --git a/src/publisher.ts b/src/publisher.ts index a77d7e2..392a604 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -17,7 +17,6 @@ import compression from 'compression'; import * as http from 'http'; import { updateUserPubkeyListLength } from './core/metrics'; -import { handleResponseTime } from './core/middleware'; import { DriftClient, DriftEnv, @@ -73,7 +72,6 @@ app.use(cors({ origin: '*' })); app.use(compression()); app.set('trust proxy', 1); app.use(logHttp); -app.use(handleResponseTime); const server = http.createServer(app); From 1e403c948f3c30cd9f45059cdae4ce1f8e746eee Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Tue, 8 Oct 2024 15:56:29 +1100 Subject: [PATCH 06/10] chore: update logic for key sync --- src/publisher.ts | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index 392a604..b584278 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -256,16 +256,7 @@ export class WebsocketCacheProgramAccountSubscriber { await Promise.all(promises); - await this.redisClient.lTrim('user_pubkeys', -1, 0); - - const batches = COMMON_UI_UTILS.chunks(Array.from(programAccountBufferMap.keys()), 1000) - - for (const batch of batches) { - await this.redisClient.rPush( - 'user_pubkeys', - ...batch - ); - } + await this.syncPubKeys(programAccountBufferMap) } catch (e) { const err = e as Error; console.error( @@ -278,6 +269,26 @@ export class WebsocketCacheProgramAccountSubscriber { } } + async syncPubKeys(programAccountBufferMap: Map): Promise { + const newKeys = Array.from(programAccountBufferMap.keys()); + const currentKeys = await this.redisClient.lRange('user_pubkeys', 0, -1); + + const keysToAdd = newKeys.filter(key => !currentKeys.includes(key)); + const keysToRemove = currentKeys.filter(key => !newKeys.includes(key)); + + const removalBatches = COMMON_UI_UTILS.chunks(keysToRemove, 1000); + for (const batch of removalBatches) { + await Promise.all(batch.map(key => this.redisClient.lRem('user_pubkeys', 0, key))); + } + + const additionBatches = COMMON_UI_UTILS.chunks(keysToAdd, 1000); + for (const batch of additionBatches) { + await this.redisClient.rPush('user_pubkeys', ...batch); + } + + console.log(`Synchronized user_pubkeys: Added ${keysToAdd.length}, Removed ${keysToRemove.length}`); + } + async checkSync(): Promise { if (this.syncLock) { logger.info('SYNC LOCKED DURING CHECK'); From e6e10b1a56eee2e03779e509e0c143552909a8bf Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Tue, 8 Oct 2024 15:57:13 +1100 Subject: [PATCH 07/10] fix: formatting --- src/publisher.ts | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index b584278..c6bfb0e 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -256,7 +256,7 @@ export class WebsocketCacheProgramAccountSubscriber { await Promise.all(promises); - await this.syncPubKeys(programAccountBufferMap) + await this.syncPubKeys(programAccountBufferMap); } catch (e) { const err = e as Error; console.error( @@ -269,25 +269,31 @@ export class WebsocketCacheProgramAccountSubscriber { } } - async syncPubKeys(programAccountBufferMap: Map): Promise { - const newKeys = Array.from(programAccountBufferMap.keys()); - const currentKeys = await this.redisClient.lRange('user_pubkeys', 0, -1); + async syncPubKeys( + programAccountBufferMap: Map + ): Promise { + const newKeys = Array.from(programAccountBufferMap.keys()); + const currentKeys = await this.redisClient.lRange('user_pubkeys', 0, -1); - const keysToAdd = newKeys.filter(key => !currentKeys.includes(key)); - const keysToRemove = currentKeys.filter(key => !newKeys.includes(key)); + const keysToAdd = newKeys.filter((key) => !currentKeys.includes(key)); + const keysToRemove = currentKeys.filter((key) => !newKeys.includes(key)); - const removalBatches = COMMON_UI_UTILS.chunks(keysToRemove, 1000); - for (const batch of removalBatches) { - await Promise.all(batch.map(key => this.redisClient.lRem('user_pubkeys', 0, key))); - } + const removalBatches = COMMON_UI_UTILS.chunks(keysToRemove, 1000); + for (const batch of removalBatches) { + await Promise.all( + batch.map((key) => this.redisClient.lRem('user_pubkeys', 0, key)) + ); + } - const additionBatches = COMMON_UI_UTILS.chunks(keysToAdd, 1000); - for (const batch of additionBatches) { - await this.redisClient.rPush('user_pubkeys', ...batch); - } + const additionBatches = COMMON_UI_UTILS.chunks(keysToAdd, 1000); + for (const batch of additionBatches) { + await this.redisClient.rPush('user_pubkeys', ...batch); + } - console.log(`Synchronized user_pubkeys: Added ${keysToAdd.length}, Removed ${keysToRemove.length}`); - } + console.log( + `Synchronized user_pubkeys: Added ${keysToAdd.length}, Removed ${keysToRemove.length}` + ); + } async checkSync(): Promise { if (this.syncLock) { From 7c525fe7ed87774725d39747557e878d334f3201 Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Tue, 8 Oct 2024 18:38:17 +1100 Subject: [PATCH 08/10] chore: update log --- src/publisher.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/publisher.ts b/src/publisher.ts index c6bfb0e..55c0938 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -290,7 +290,7 @@ export class WebsocketCacheProgramAccountSubscriber { await this.redisClient.rPush('user_pubkeys', ...batch); } - console.log( + logger.info( `Synchronized user_pubkeys: Added ${keysToAdd.length}, Removed ${keysToRemove.length}` ); } From 2a49c8b12ac8c120accad036a952410ab3da0d83 Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Tue, 8 Oct 2024 18:52:05 +1100 Subject: [PATCH 09/10] chore: reduce batches --- src/publisher.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index 55c0938..da6485c 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -278,14 +278,14 @@ export class WebsocketCacheProgramAccountSubscriber { const keysToAdd = newKeys.filter((key) => !currentKeys.includes(key)); const keysToRemove = currentKeys.filter((key) => !newKeys.includes(key)); - const removalBatches = COMMON_UI_UTILS.chunks(keysToRemove, 1000); + const removalBatches = COMMON_UI_UTILS.chunks(keysToRemove, 100); for (const batch of removalBatches) { await Promise.all( batch.map((key) => this.redisClient.lRem('user_pubkeys', 0, key)) ); } - const additionBatches = COMMON_UI_UTILS.chunks(keysToAdd, 1000); + const additionBatches = COMMON_UI_UTILS.chunks(keysToAdd, 100); for (const batch of additionBatches) { await this.redisClient.rPush('user_pubkeys', ...batch); } From 1051dd1663e3bc7f9e9637a025fbedb3d8bdd958 Mon Sep 17 00:00:00 2001 From: Jack Waller Date: Tue, 8 Oct 2024 19:14:38 +1100 Subject: [PATCH 10/10] chore: sync pub key list --- src/publisher.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index da6485c..3c6292c 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -143,12 +143,6 @@ export class WebsocketCacheProgramAccountSubscriber { (SYNC_INTERVAL * EXPIRY_MULTIPLIER) / 1000, `${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}` ); - - await this.redisClient.rPush( - 'user_pubkeys', - keyedAccountInfo.accountId.toString() - ); - return; }