Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use multiple redis clients #61

Merged
merged 7 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"license": "Apache-2.0",
"dependencies": {
"@coral-xyz/anchor": "^0.29.0",
"@drift-labs/sdk": "2.53.0-beta.5",
"@drift-labs/sdk": "2.53.0-beta.8",
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.1",
"@opentelemetry/exporter-prometheus": "^0.31.0",
Expand Down
150 changes: 138 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,33 @@

require('dotenv').config();

const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = process.env.REDIS_PORT || '6379';
const REDIS_PASSWORD = process.env.REDIS_PASSWORD;
// Reading in Redis env vars
const REDIS_HOSTS_ENV = (process.env.REDIS_HOSTS as string) || 'localhost';
const REDIS_HOSTS = REDIS_HOSTS_ENV.includes(',')
? REDIS_HOSTS_ENV.trim()
.replace(/^\[|\]$/g, '')
.split(/\s*,\s*/)
: [REDIS_HOSTS_ENV];

const REDIS_PASSWORDS_ENV = (process.env.REDIS_PASSWORDS as string) || '';
const REDIS_PASSWORDS = REDIS_PASSWORDS_ENV.includes(',')
? REDIS_PASSWORDS_ENV.trim()
.replace(/^\[|\]$/g, '')
.split(/\s*,\s*/)
: [REDIS_PASSWORDS_ENV];

const REDIS_PORTS_ENV = (process.env.REDIS_PORTS as string) || '6379';
const REDIS_PORTS = REDIS_PORTS_ENV.includes(',')
? REDIS_PORTS_ENV.trim()
.replace(/^\[|\]$/g, '')
.split(/\s*,\s*/)
: [REDIS_PORTS_ENV];
if (
REDIS_PORTS.length !== REDIS_PASSWORDS.length ||
REDIS_PORTS.length !== REDIS_HOSTS.length
) {
throw 'REDIS_HOSTS and REDIS_PASSWORDS and REDIS_PORTS must be the same length';
}

const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;
const commitHash = process.env.COMMIT;
Expand All @@ -69,6 +93,8 @@
const serverPort = process.env.PORT || 6969;
export const ORDERBOOK_UPDATE_INTERVAL = 1000;
const WS_FALLBACK_FETCH_INTERVAL = ORDERBOOK_UPDATE_INTERVAL * 10;
const SLOT_STALENESS_TOLERANCE =
parseInt(process.env.SLOT_STALENESS_TOLERANCE) || 20;
const useWebsocket = process.env.USE_WEBSOCKET?.toLowerCase() === 'true';
const rateLimitCallsPerSecond = process.env.RATE_LIMIT_CALLS_PER_SECOND
? parseInt(process.env.RATE_LIMIT_CALLS_PER_SECOND)
Expand Down Expand Up @@ -320,11 +346,39 @@
`DLOBSubscriber initialized in ${Date.now() - initDlobSubscriberStart} ms`
);

let redisClient: RedisClient;
const redisClients: Array<RedisClient> = [];
const spotMarketRedisMap: Map<
number,
{ client: RedisClient; clientIndex: number }
> = new Map();
const perpMarketRedisMap: Map<
number,
{ client: RedisClient; clientIndex: number }
> = new Map();
if (useRedis) {
logger.info('Connecting to redis');
redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
await redisClient.connect();
for (let i = 0; i < REDIS_HOSTS.length; i++) {
redisClients.push(
new RedisClient(
REDIS_HOSTS[i],
REDIS_PORTS[i],
REDIS_PASSWORDS[i] || undefined
)
);
await redisClients[i].connect();
}
for (let i = 0; i < sdkConfig.SPOT_MARKETS.length; i++) {
spotMarketRedisMap.set(sdkConfig.SPOT_MARKETS[i].marketIndex, {
client: redisClients[0],
clientIndex: 0,
});
}
for (let i = 0; i < sdkConfig.PERP_MARKETS.length; i++) {
perpMarketRedisMap.set(sdkConfig.PERP_MARKETS[i].marketIndex, {
client: redisClients[0],
clientIndex: 0,
});
}
}

logger.info(`Initializing all market subscribers...`);
Expand Down Expand Up @@ -633,13 +687,13 @@
break;
}
if (side.userAccount) {
const maker = side.userAccount.toBase58();

Check failure on line 690 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.

Check failure on line 690 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.

Check failure on line 690 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.

Check failure on line 690 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.
if (topMakers.has(maker)) {
continue;
} else {
if (`${includeUserStats}`.toLowerCase() === 'true') {
const userAccount = dlobProvider.getUserAccount(
side.userAccount

Check failure on line 696 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Argument of type 'string' is not assignable to parameter of type 'PublicKey'.

Check failure on line 696 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Argument of type 'string' is not assignable to parameter of type 'PublicKey'.

Check failure on line 696 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Argument of type 'string' is not assignable to parameter of type 'PublicKey'.

Check failure on line 696 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Argument of type 'string' is not assignable to parameter of type 'PublicKey'.
);
topMakers.add([
userAccount,
Expand All @@ -649,7 +703,7 @@
),
]);
} else {
topMakers.add(side.userAccount.toBase58());

Check failure on line 706 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.

Check failure on line 706 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.

Check failure on line 706 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.

Check failure on line 706 in src/index.ts

View workflow job for this annotation

GitHub Actions / update-sdk

Property 'toBase58' does not exist on type 'string'.
}
foundMakers++;
}
Expand Down Expand Up @@ -735,6 +789,7 @@
!grouping
) {
let redisL2: string;
const redisClient = perpMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_perp_${normedMarketIndex}_depth_5`
Expand All @@ -750,9 +805,24 @@
}
if (
redisL2 &&
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) < 10
)
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = redisL2;
} else {
if (redisL2 && redisClients.length > 1) {
const nextClientIndex =
(perpMarketRedisMap.get(normedMarketIndex).clientIndex + 1) %
redisClients.length;
perpMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for perp market ${normedMarketIndex}`
);
}
}
} else if (
isSpot &&
`${includeSerum}`?.toLowerCase() === 'true' &&
Expand All @@ -761,6 +831,7 @@
!grouping
) {
let redisL2: string;
const redisClient = spotMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_spot_${normedMarketIndex}_depth_5`
Expand All @@ -776,9 +847,24 @@
}
if (
redisL2 &&
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) < 10
)
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = redisL2;
} else {
if (redisL2 && redisClients.length > 1) {
const nextClientIndex =
(spotMarketRedisMap.get(normedMarketIndex).clientIndex + 1) %
redisClients.length;
spotMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for spot market ${normedMarketIndex}`
);
}
}
}

if (l2Formatted) {
Expand Down Expand Up @@ -917,6 +1003,8 @@
!normedParam['grouping']
) {
let redisL2: string;
const redisClient =
perpMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_perp_${normedMarketIndex}_depth_5`
Expand All @@ -932,8 +1020,26 @@
}
if (redisL2) {
const parsedRedisL2 = JSON.parse(redisL2);
if (dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) < 10)
if (
dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = parsedRedisL2;
} else {
if (redisClients.length > 1) {
const nextClientIndex =
(perpMarketRedisMap.get(normedMarketIndex).clientIndex +
1) %
redisClients.length;
perpMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for perp market ${normedMarketIndex}`
);
}
}
}
} else if (
isSpot &&
Expand All @@ -942,6 +1048,8 @@
!normedParam['grouping']
) {
let redisL2: string;
const redisClient =
spotMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_spot_${normedMarketIndex}_depth_5`
Expand All @@ -957,8 +1065,26 @@
}
if (redisL2) {
const parsedRedisL2 = JSON.parse(redisL2);
if (dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) < 10)
if (
dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = parsedRedisL2;
} else {
if (redisClients.length > 1) {
const nextClientIndex =
(spotMarketRedisMap.get(normedMarketIndex).clientIndex +
1) %
redisClients.length;
spotMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for spot market ${normedMarketIndex}`
);
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/publishers/dlobPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ const main = async () => {
const initAllMarketSubscribersStart = Date.now();
MARKET_SUBSCRIBERS = await initializeAllMarketSubscribers(driftClient);
logger.info(
`All market subscribers initialized in ${Date.now() - initAllMarketSubscribersStart
`All market subscribers initialized in ${
Date.now() - initAllMarketSubscribersStart
} ms`
);

Expand Down
Loading
Loading