Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better error handling #12

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/wsConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ const REDIS_PASSWORD = process.env.REDIS_PASSWORD;

async function main() {
const redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
const lastMessageRetriever = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
const lastMessageRetriever = new RedisClient(
REDIS_HOST,
REDIS_PORT,
REDIS_PASSWORD
);

await redisClient.connect();
await lastMessageRetriever.connect();
Expand All @@ -39,20 +43,34 @@ async function main() {
});

// Save and persist last message
lastMessageRetriever.client.set(`last_update_${subscribedChannel}`, message);
lastMessageRetriever.client.set(
`last_update_${subscribedChannel}`,
message
);
});

redisClient.client.on('error', (error) => {
console.error('Redis client error:', error);
});

wss.on('connection', (ws: WebSocket) => {
console.log('Client connected');

ws.on('message', async (msg) => {
const parsedMessage = JSON.parse(msg.toString());
let parsedMessage: any;
let messageType: string;
try {
parsedMessage = JSON.parse(msg.toString());
messageType = parsedMessage.type.toLowerCase();
} catch (e) {
return;
}

switch (parsedMessage.type.toLowerCase()) {
switch (messageType) {
case 'subscribe': {
const channel = parsedMessage.channel;
if (!subscribedChannels.has(channel)) {
console.log('Subscribing to channel', channel);
console.log('Trying to subscribe to channel', channel);
redisClient.client
.subscribe(channel)
.then(() => {
Expand All @@ -62,7 +80,7 @@ async function main() {
ws.send(
JSON.stringify({
channel,
error: `Invalid channel: ${channel}`,
error: `Error subscribing to channel: ${channel}`,
})
);
return;
Expand All @@ -76,9 +94,10 @@ async function main() {
channelSubscribers.get(channel).add(ws);

// Fetch and send last message
const lastMessage = await lastMessageRetriever.client.get(`last_update_${channel}`);
const lastMessage = await lastMessageRetriever.client.get(
`last_update_${channel}`
);
if (lastMessage !== null) {
console.log('sending last message on new subscribe');
ws.send(JSON.stringify({ channel, data: lastMessage }));
}
break;
Expand All @@ -91,6 +110,7 @@ async function main() {
}
break;
}
case undefined:
default:
break;
}
Expand Down Expand Up @@ -122,6 +142,12 @@ async function main() {
// Clear any existing intervals and timeouts
clearInterval(pingIntervalId);
clearTimeout(pongTimeoutId);
channelSubscribers.forEach((subscribers, channel) => {
if (subscribers.delete(ws) && subscribers.size === 0) {
redisClient.client.unsubscribe(channel);
channelSubscribers.delete(channel);
}
});
});

ws.on('disconnect', () => {
Expand All @@ -136,8 +162,16 @@ async function main() {
server.listen(WS_PORT, () => {
console.log(`connection manager running on ${WS_PORT}`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});
}

process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
});

async function recursiveTryCatch(f: () => void) {
try {
await f();
Expand Down
Loading