Skip to content

Commit

Permalink
Merge pull request #61 from drift-labs/master
Browse files Browse the repository at this point in the history
use multiple redis clients
  • Loading branch information
NourAlharithi authored Dec 28, 2023
2 parents dfb54d4 + 42f8ece commit 9bc52e6
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 27 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"license": "Apache-2.0",
"dependencies": {
"@coral-xyz/anchor": "^0.29.0",
"@drift-labs/sdk": "2.53.0-beta.5",
"@drift-labs/sdk": "2.53.0-beta.8",
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.1",
"@opentelemetry/exporter-prometheus": "^0.31.0",
Expand Down
150 changes: 138 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,33 @@ import { RedisClient } from './utils/redisClient';

require('dotenv').config();

const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = process.env.REDIS_PORT || '6379';
const REDIS_PASSWORD = process.env.REDIS_PASSWORD;
// Reading in Redis env vars
const REDIS_HOSTS_ENV = (process.env.REDIS_HOSTS as string) || 'localhost';
const REDIS_HOSTS = REDIS_HOSTS_ENV.includes(',')
? REDIS_HOSTS_ENV.trim()
.replace(/^\[|\]$/g, '')
.split(/\s*,\s*/)
: [REDIS_HOSTS_ENV];

const REDIS_PASSWORDS_ENV = (process.env.REDIS_PASSWORDS as string) || '';
const REDIS_PASSWORDS = REDIS_PASSWORDS_ENV.includes(',')
? REDIS_PASSWORDS_ENV.trim()
.replace(/^\[|\]$/g, '')
.split(/\s*,\s*/)
: [REDIS_PASSWORDS_ENV];

const REDIS_PORTS_ENV = (process.env.REDIS_PORTS as string) || '6379';
const REDIS_PORTS = REDIS_PORTS_ENV.includes(',')
? REDIS_PORTS_ENV.trim()
.replace(/^\[|\]$/g, '')
.split(/\s*,\s*/)
: [REDIS_PORTS_ENV];
if (
REDIS_PORTS.length !== REDIS_PASSWORDS.length ||
REDIS_PORTS.length !== REDIS_HOSTS.length
) {
throw 'REDIS_HOSTS and REDIS_PASSWORDS and REDIS_PORTS must be the same length';
}

const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;
const commitHash = process.env.COMMIT;
Expand All @@ -69,6 +93,8 @@ const stateCommitment: Commitment = 'processed';
const serverPort = process.env.PORT || 6969;
export const ORDERBOOK_UPDATE_INTERVAL = 1000;
const WS_FALLBACK_FETCH_INTERVAL = ORDERBOOK_UPDATE_INTERVAL * 10;
const SLOT_STALENESS_TOLERANCE =
parseInt(process.env.SLOT_STALENESS_TOLERANCE) || 20;
const useWebsocket = process.env.USE_WEBSOCKET?.toLowerCase() === 'true';
const rateLimitCallsPerSecond = process.env.RATE_LIMIT_CALLS_PER_SECOND
? parseInt(process.env.RATE_LIMIT_CALLS_PER_SECOND)
Expand Down Expand Up @@ -320,11 +346,39 @@ const main = async () => {
`DLOBSubscriber initialized in ${Date.now() - initDlobSubscriberStart} ms`
);

let redisClient: RedisClient;
const redisClients: Array<RedisClient> = [];
const spotMarketRedisMap: Map<
number,
{ client: RedisClient; clientIndex: number }
> = new Map();
const perpMarketRedisMap: Map<
number,
{ client: RedisClient; clientIndex: number }
> = new Map();
if (useRedis) {
logger.info('Connecting to redis');
redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
await redisClient.connect();
for (let i = 0; i < REDIS_HOSTS.length; i++) {
redisClients.push(
new RedisClient(
REDIS_HOSTS[i],
REDIS_PORTS[i],
REDIS_PASSWORDS[i] || undefined
)
);
await redisClients[i].connect();
}
for (let i = 0; i < sdkConfig.SPOT_MARKETS.length; i++) {
spotMarketRedisMap.set(sdkConfig.SPOT_MARKETS[i].marketIndex, {
client: redisClients[0],
clientIndex: 0,
});
}
for (let i = 0; i < sdkConfig.PERP_MARKETS.length; i++) {
perpMarketRedisMap.set(sdkConfig.PERP_MARKETS[i].marketIndex, {
client: redisClients[0],
clientIndex: 0,
});
}
}

logger.info(`Initializing all market subscribers...`);
Expand Down Expand Up @@ -735,6 +789,7 @@ const main = async () => {
!grouping
) {
let redisL2: string;
const redisClient = perpMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_perp_${normedMarketIndex}_depth_5`
Expand All @@ -750,9 +805,24 @@ const main = async () => {
}
if (
redisL2 &&
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) < 10
)
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = redisL2;
} else {
if (redisL2 && redisClients.length > 1) {
const nextClientIndex =
(perpMarketRedisMap.get(normedMarketIndex).clientIndex + 1) %
redisClients.length;
perpMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for perp market ${normedMarketIndex}`
);
}
}
} else if (
isSpot &&
`${includeSerum}`?.toLowerCase() === 'true' &&
Expand All @@ -761,6 +831,7 @@ const main = async () => {
!grouping
) {
let redisL2: string;
const redisClient = spotMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_spot_${normedMarketIndex}_depth_5`
Expand All @@ -776,9 +847,24 @@ const main = async () => {
}
if (
redisL2 &&
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) < 10
)
dlobProvider.getSlot() - parseInt(JSON.parse(redisL2).slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = redisL2;
} else {
if (redisL2 && redisClients.length > 1) {
const nextClientIndex =
(spotMarketRedisMap.get(normedMarketIndex).clientIndex + 1) %
redisClients.length;
spotMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for spot market ${normedMarketIndex}`
);
}
}
}

if (l2Formatted) {
Expand Down Expand Up @@ -917,6 +1003,8 @@ const main = async () => {
!normedParam['grouping']
) {
let redisL2: string;
const redisClient =
perpMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_perp_${normedMarketIndex}_depth_5`
Expand All @@ -932,8 +1020,26 @@ const main = async () => {
}
if (redisL2) {
const parsedRedisL2 = JSON.parse(redisL2);
if (dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) < 10)
if (
dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = parsedRedisL2;
} else {
if (redisClients.length > 1) {
const nextClientIndex =
(perpMarketRedisMap.get(normedMarketIndex).clientIndex +
1) %
redisClients.length;
perpMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for perp market ${normedMarketIndex}`
);
}
}
}
} else if (
isSpot &&
Expand All @@ -942,6 +1048,8 @@ const main = async () => {
!normedParam['grouping']
) {
let redisL2: string;
const redisClient =
spotMarketRedisMap.get(normedMarketIndex).client;
if (parseInt(adjustedDepth as string) === 5) {
redisL2 = await redisClient.client.get(
`last_update_orderbook_spot_${normedMarketIndex}_depth_5`
Expand All @@ -957,8 +1065,26 @@ const main = async () => {
}
if (redisL2) {
const parsedRedisL2 = JSON.parse(redisL2);
if (dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) < 10)
if (
dlobProvider.getSlot() - parseInt(parsedRedisL2.slot) <
SLOT_STALENESS_TOLERANCE
) {
l2Formatted = parsedRedisL2;
} else {
if (redisClients.length > 1) {
const nextClientIndex =
(spotMarketRedisMap.get(normedMarketIndex).clientIndex +
1) %
redisClients.length;
spotMarketRedisMap.set(normedMarketIndex, {
client: redisClients[nextClientIndex],
clientIndex: nextClientIndex,
});
console.log(
`Rotated redis client to index ${nextClientIndex} for spot market ${normedMarketIndex}`
);
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/publishers/dlobPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ const main = async () => {
const initAllMarketSubscribersStart = Date.now();
MARKET_SUBSCRIBERS = await initializeAllMarketSubscribers(driftClient);
logger.info(
`All market subscribers initialized in ${Date.now() - initAllMarketSubscribersStart
`All market subscribers initialized in ${
Date.now() - initAllMarketSubscribersStart
} ms`
);

Expand Down
Loading

0 comments on commit 9bc52e6

Please sign in to comment.