From cbecd513a481c6f26b0f83f6637544640c413bf1 Mon Sep 17 00:00:00 2001 From: Nour Alharithi Date: Tue, 7 Nov 2023 11:26:14 -0800 Subject: [PATCH] better error handling --- src/wsConnectionManager.ts | 50 ++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/src/wsConnectionManager.ts b/src/wsConnectionManager.ts index abf798f..270002e 100644 --- a/src/wsConnectionManager.ts +++ b/src/wsConnectionManager.ts @@ -24,7 +24,11 @@ const REDIS_PASSWORD = process.env.REDIS_PASSWORD; async function main() { const redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD); - const lastMessageRetriever = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD); + const lastMessageRetriever = new RedisClient( + REDIS_HOST, + REDIS_PORT, + REDIS_PASSWORD + ); await redisClient.connect(); await lastMessageRetriever.connect(); @@ -39,20 +43,34 @@ async function main() { }); // Save and persist last message - lastMessageRetriever.client.set(`last_update_${subscribedChannel}`, message); + lastMessageRetriever.client.set( + `last_update_${subscribedChannel}`, + message + ); + }); + + redisClient.client.on('error', (error) => { + console.error('Redis client error:', error); }); wss.on('connection', (ws: WebSocket) => { console.log('Client connected'); ws.on('message', async (msg) => { - const parsedMessage = JSON.parse(msg.toString()); + let parsedMessage: any; + let messageType: string; + try { + parsedMessage = JSON.parse(msg.toString()); + messageType = parsedMessage.type.toLowerCase(); + } catch (e) { + return; + } - switch (parsedMessage.type.toLowerCase()) { + switch (messageType) { case 'subscribe': { const channel = parsedMessage.channel; if (!subscribedChannels.has(channel)) { - console.log('Subscribing to channel', channel); + console.log('Trying to subscribe to channel', channel); redisClient.client .subscribe(channel) .then(() => { @@ -62,7 +80,7 @@ async function main() { ws.send( JSON.stringify({ channel, - error: `Invalid channel: ${channel}`, + error: `Error subscribing to channel: ${channel}`, }) ); return; @@ -76,9 +94,10 @@ async function main() { channelSubscribers.get(channel).add(ws); // Fetch and send last message - const lastMessage = await lastMessageRetriever.client.get(`last_update_${channel}`); + const lastMessage = await lastMessageRetriever.client.get( + `last_update_${channel}` + ); if (lastMessage !== null) { - console.log('sending last message on new subscribe'); ws.send(JSON.stringify({ channel, data: lastMessage })); } break; @@ -91,6 +110,7 @@ async function main() { } break; } + case undefined: default: break; } @@ -122,6 +142,12 @@ async function main() { // Clear any existing intervals and timeouts clearInterval(pingIntervalId); clearTimeout(pongTimeoutId); + channelSubscribers.forEach((subscribers, channel) => { + if (subscribers.delete(ws) && subscribers.size === 0) { + redisClient.client.unsubscribe(channel); + channelSubscribers.delete(channel); + } + }); }); ws.on('disconnect', () => { @@ -136,8 +162,16 @@ async function main() { server.listen(WS_PORT, () => { console.log(`connection manager running on ${WS_PORT}`); }); + + server.on('error', (error) => { + console.error('Server error:', error); + }); } +process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason); +}); + async function recursiveTryCatch(f: () => void) { try { await f();