Skip to content

Commit

Permalink
*prettify + new example
Browse files Browse the repository at this point in the history
  • Loading branch information
NourAlharithi committed Oct 29, 2023
1 parent ee5cd4e commit f3d64ff
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
15 changes: 11 additions & 4 deletions example/newClient.ts → example/wsClient.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import WebSocket from 'ws';
// const ws = new WebSocket('wss://master.dlob.drift.trade/ws');
const ws = new WebSocket('ws://localhost:3000/ws');
import { sleep } from '../src/utils/utils';

ws.on('open', () => {
ws.on('open', async () => {
console.log('Connected to the server');
ws.send(JSON.stringify({ type: 'subscribe', channel: 'SOL-PERP' }));
ws.send(JSON.stringify({ type: 'subscribe', channel: 'LINK-PERP' }));
ws.send(JSON.stringify({ type: 'subscribe', channel: 'INJ-PERP' }));
await sleep(5000);

ws.send(JSON.stringify({ type: 'unsubscribe', channel: 'SOL-PERP' }));
console.log("####################");
});

ws.on('message', (data: WebSocket.Data) => {
try {
const message = JSON.parse(data.toString());
if (message.channel === 'SOL-PERP') {
console.log('Received data:', message.data);
}
console.log(`Received data from market ${message.channel}`);
// book data is in message.data
} catch (e) {
console.error('Invalid message:', data);
}
Expand Down
32 changes: 20 additions & 12 deletions src/wsConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ app.use(compression());
app.set('trust proxy', 1);

const server = http.createServer(app);
const wss = new WebSocketServer({server, path: '/ws'});
const wss = new WebSocketServer({ server, path: '/ws' });

const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = process.env.REDIS_PORT || '6379';
Expand All @@ -28,7 +28,7 @@ async function main() {
redisClient.client.on('message', (subscribedChannel, message) => {
const subscribers = channelSubscribers.get(subscribedChannel);
subscribers.forEach((ws) => {
ws.send(JSON.stringify({channel: subscribedChannel, data: message }));
ws.send(JSON.stringify({ channel: subscribedChannel, data: message }));
});
});

Expand All @@ -43,14 +43,22 @@ async function main() {
const channel = parsedMessage.channel;
if (!subscribedChannels.has(channel)) {
console.log('Subscribing to channel', channel);
redisClient.client.subscribe(channel).then(() => {
subscribedChannels.add(channel);
}).catch(() => {
ws.send(JSON.stringify({channel, error: `Invalid channel: ${channel}`}));
return;
});
}

redisClient.client
.subscribe(channel)
.then(() => {
subscribedChannels.add(channel);
})
.catch(() => {
ws.send(
JSON.stringify({
channel,
error: `Invalid channel: ${channel}`,
})
);
return;
});
}

if (!channelSubscribers.get(channel)) {
const subscribers = new Set<WebSocket>();
channelSubscribers.set(channel, subscribers);
Expand All @@ -65,7 +73,7 @@ async function main() {
channelSubscribers.get(channel).delete(ws);
}
break;
}
}
default:
break;
}
Expand All @@ -81,7 +89,7 @@ async function main() {
console.log('Disconnecting because of ping/pong timeout');
ws.terminate();
}
}, 5000); // 5 seconds to wait for a pong
}, 5000); // 5 seconds to wait for a pong
ws.ping();
}, 30000);

Expand Down

0 comments on commit f3d64ff

Please sign in to comment.