diff --git a/package.json b/package.json index 3709d6b..67c1753 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "license": "Apache-2.0", "dependencies": { "@coral-xyz/anchor": "^0.29.0", - "@drift-labs/sdk": "2.65.0-beta.0", + "@drift-labs/sdk": "2.65.0-beta.5", "@opentelemetry/api": "^1.1.0", "@opentelemetry/auto-instrumentations-node": "^0.31.1", "@opentelemetry/exporter-prometheus": "^0.31.0", diff --git a/src/wsConnectionManager.ts b/src/wsConnectionManager.ts index 1afaec4..2f56720 100644 --- a/src/wsConnectionManager.ts +++ b/src/wsConnectionManager.ts @@ -36,6 +36,7 @@ const WS_PORT = process.env.WS_PORT || '3000'; console.log(`WS LISTENER PORT : ${WS_PORT}`); const REDIS_PASSWORD = process.env.REDIS_PASSWORD; +const MAX_BUFFERED_AMOUNT = 300000; const safeGetRawChannelFromMessage = (message: any): string => { return message?.channel; @@ -103,7 +104,10 @@ async function main() { const subscribers = channelSubscribers.get(subscribedChannel); if (subscribers) { subscribers.forEach((ws) => { - if (ws.readyState === WebSocket.OPEN && ws.bufferedAmount < 300000) + if ( + ws.readyState === WebSocket.OPEN && + ws.bufferedAmount < MAX_BUFFERED_AMOUNT + ) ws.send( JSON.stringify({ channel: subscribedChannel, data: message }) ); @@ -245,10 +249,31 @@ async function main() { } }); + // Set interval to send heartbeat every 5 seconds + const heartbeatInterval = setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ channel: 'heartbeat' })); + } else { + clearInterval(heartbeatInterval); + } + }, 5000); + + // Buffer overflow check interval + const bufferInterval = setInterval(() => { + if (ws.bufferedAmount > MAX_BUFFERED_AMOUNT) { + if (ws.readyState === WebSocket.OPEN) { + ws.close(1008, 'Buffer overflow'); + } + clearInterval(bufferInterval); + } + }, 10000); + // Handle disconnection ws.on('close', () => { console.log('Client disconnected'); - // Clear any existing intervals and timeouts + // Clear any existing intervals + clearInterval(heartbeatInterval); + clearInterval(bufferInterval); channelSubscribers.forEach((subscribers, channel) => { if (subscribers.delete(ws) && subscribers.size === 0) { redisClient.client.unsubscribe(channel); @@ -262,15 +287,6 @@ async function main() { ws.on('error', (error) => { console.error('Socket error:', error); }); - - // Set interval to send heartbeat every 5 seconds - setInterval(() => { - ws.send( - JSON.stringify({ - channel: 'heartbeat', - }) - ); - }, 5000); }); server.listen(WS_PORT, () => { @@ -285,19 +301,6 @@ async function main() { server.on('error', (error) => { console.error('Server error:', error); }); - - // Periodic check for bad clients and disconnect them to alleviate backpressure - setInterval(() => { - let set = new Set(); - for (const [_channel, wsSet] of channelSubscribers) { - set = new Set([...set, ...wsSet]); - } - for (const ws of set) { - if (ws.bufferedAmount > 350000) { - ws.close(); - } - } - }, 5000); } async function recursiveTryCatch(f: () => void) { diff --git a/yarn.lock b/yarn.lock index 86e2692..8f534dd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -115,10 +115,10 @@ enabled "2.0.x" kuler "^2.0.0" -"@drift-labs/sdk@2.65.0-beta.0": - version "2.65.0-beta.0" - resolved "https://registry.yarnpkg.com/@drift-labs/sdk/-/sdk-2.65.0-beta.0.tgz#9a66034dd8d6f1a65cde130d62cbc3e26614aa51" - integrity sha512-cVzt90WCuZS2OAZ+9kLmvz7WxYfM4f3iiIjhtftV2nUsgWfNgJgzhmIEYUORa1PMpXzMSEYrIs/JNIYtwt9irA== +"@drift-labs/sdk@2.65.0-beta.5": + version "2.65.0-beta.5" + resolved "https://registry.yarnpkg.com/@drift-labs/sdk/-/sdk-2.65.0-beta.5.tgz#3980886957cda76e80a8982a4363647711f223bb" + integrity sha512-M95F/R4fXNLvL0GkhoDQvfXLe+UpWPgvLWrJNnFR0j6WMS+IyJTykiZSBI0n/EqTI78AnA+nvbV0GVcFfpZnrA== dependencies: "@coral-xyz/anchor" "0.28.1-beta.2" "@ellipsis-labs/phoenix-sdk" "^1.4.2"