Skip to content

Commit

Permalink
add pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
NourAlharithi committed Jan 31, 2024
1 parent 5980d70 commit b11cd8c
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 7 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"prettify:fix": "prettier --write './src/**/*.ts'",
"lint": "eslint . --ext ts --quiet",
"lint:fix": "eslint . --ext ts --fix",
"publisher": "ts-node src/publisher.ts"
"publisher": "ts-node src/publisher.ts",
"pruner": "ts-node src/pruner.ts"
},
"devDependencies": {
"@types/k6": "^0.45.0",
Expand Down
92 changes: 92 additions & 0 deletions src/pruner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { DriftClient, DriftEnv, UserMap, Wallet } from '@drift-labs/sdk';
import { Connection, Keypair } from '@solana/web3.js';
import { sleep } from './utils/utils';
import { logger } from './utils/logger';
import { RedisClient } from './utils/redisClient';

require('dotenv').config();

const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;

const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = process.env.REDIS_PORT || '6379';
const REDIS_PASSWORD = process.env.REDIS_PASSWORD;

const endpoint = process.env.ENDPOINT!;
if (!endpoint) {
logger.error('ENDPOINT env var is required');
process.exit(1);
}
const wsEndpoint = process.env.WS_ENDPOINT || endpoint;
logger.info(`RPC endpoint: ${endpoint}`);
logger.info(`WS endpoint: ${wsEndpoint}`);
logger.info(`DriftEnv: ${driftEnv}`);

async function main() {
// Set up drift client for the program
const connection = new Connection(endpoint, 'recent');
const wallet = new Wallet(new Keypair());
const driftClient = new DriftClient({
connection,
wallet,
env: driftEnv,
});
await driftClient.subscribe();

const userMap = new UserMap({
driftClient,
connection,
includeIdle: false,
fastDecode: true,
subscriptionConfig: {
type: 'polling',
frequency: 0,
commitment: 'finalized',
},
});
await userMap.subscribe();

const redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
await redisClient.connect();

// Fetch the userMap and prune the redis cache from idle users
let cursor = '0';
do {
const reply = await redisClient.client.scan(
cursor,
'MATCH',
'*',
'COUNT',
100
);
cursor = reply[0];
const keys = reply[1];

// Process the keys
for (const key of keys) {
if (userMap.get(key) === undefined) {
await redisClient.client.del(key);
await redisClient.client.lrem('user_pubkeys', 0, key);
}
}
} while (cursor !== '0');

redisClient.disconnect();
await driftClient.unsubscribe();
await userMap.unsubscribe();

console.log('Done!!');
process.exit(0);
}

async function recursiveTryCatch(f: () => Promise<void>) {
try {
await f();
} catch (e) {
console.error(e);
await sleep(15000);
await recursiveTryCatch(f);
}
}

recursiveTryCatch(() => main());
13 changes: 7 additions & 6 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import morgan from 'morgan';
import compression from 'compression';

import * as http from 'http';
import {
runtimeSpecsGauge,
} from './core/metrics';
import { runtimeSpecsGauge } from './core/metrics';
import { handleResponseTime } from './core/middleware';
import { RedisClient } from './utils/redisClient';
import {
Expand Down Expand Up @@ -118,16 +116,19 @@ export class WebsocketCacheProgramAccountSubscriber {
if (!existingData) {
await this.redisClient.client.set(
keyedAccountInfo.accountId.toString(),
`${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString()}`
`${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}`
);
await this.redisClient.client.rpush(
'user_pubkeys',
keyedAccountInfo.accountId.toString()
);
await this.redisClient.client.rpush('user_pubkeys', keyedAccountInfo.accountId.toString());
return;
}
const existingSlot = existingData.split('::')[0];
if (incomingSlot >= parseInt(existingSlot)) {
await this.redisClient.client.set(
keyedAccountInfo.accountId.toString(),
`${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString()}`
`${incomingSlot}::${keyedAccountInfo.accountInfo.data.toString('base64')}`
);
return;
}
Expand Down

0 comments on commit b11cd8c

Please sign in to comment.