Skip to content

Commit

Permalink
fix isomorphic ws calls to use net_listen
Browse files Browse the repository at this point in the history
  • Loading branch information
Metroxe committed Dec 15, 2023
1 parent df16748 commit c82d4fc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
49 changes: 30 additions & 19 deletions packages/core/src/utils/resilientEventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function resilientEventListener(args: ResilientEventListenerArgs) {
args.log && args.log(`[${new Date().toISOString()}] subscribing to event listener with topic hash: ${topicHash}`);

const request = {
id: Math.floor(Math.random() * 1000000),
id: 1,
method: "eth_subscribe",
params: [
"logs",
Expand All @@ -51,36 +51,52 @@ export function resilientEventListener(args: ResilientEventListenerArgs) {
address: args.contractAddress,
}
]
}
};

// sending this backs should return a result of true
const ping = {
id: 2,
method: "net_listening",
params:[],
};

ws.on('error', function error(err) {
ws.onerror = function error(err: any) {
args.log && args.log(`[${new Date().toISOString()}] WebSocket error: ${err}`);
});
};

ws.on('close', function close() {
ws.onclose = function close() {
args.log && args.log(`[${new Date().toISOString()}] WebSocket closed`);
if (keepAliveInterval) clearInterval(keepAliveInterval);
if (pingTimeout) clearTimeout(pingTimeout);
ws = null;
// Reconnect when the connection is closed
setTimeout(connect, 1000);
});

ws.on('message', function message(data) {
const parsedData = JSON.parse(data.toString());
};

ws.onmessage = function message(event: any) {
let parsedData;
if (typeof event.data === 'string') {
parsedData = JSON.parse(event.data);
} else if (event.data instanceof ArrayBuffer) {
const dataString = new TextDecoder().decode(event.data);
parsedData = JSON.parse(dataString);
}

if (parsedData?.id === request.id) {
subscriptionId = parsedData.result;
args.log && args.log(`[${new Date().toISOString()}] Subscription to event '${args.eventName}' established with subscription ID '${parsedData.result}'.`);
} else if (parsedData.method === 'eth_subscription' && parsedData.params.subscription === subscriptionId) {
} else if(parsedData?.id === ping.id && parsedData?.result === true) {
args.log && args.log(`[${new Date().toISOString()}] Health check complete, subscription to '${args.eventName}' is still active.`)
if (pingTimeout) clearInterval(pingTimeout);
} else if (parsedData?.method === 'eth_subscription' && parsedData.params.subscription === subscriptionId) {
const log = parsedData.params.result;
const event = contract.interface.parseLog(log);
args.log && args.log(`[${new Date().toISOString()}] Received event ${event?.name}: ${event?.args}`);
args.callback && args.callback(event);
}
});
};

ws.on('open', function open() {
ws.onopen = function open() {
args.log && args.log(`[${new Date().toISOString()}] Opened connection to Web Socket RPC`)
ws!.send(JSON.stringify(request));

Expand All @@ -91,19 +107,14 @@ export function resilientEventListener(args: ResilientEventListenerArgs) {
}
args.log && args.log(`[${new Date().toISOString()}] Performing health check on the Web Socket RPC, to maintain subscription to '${args.eventName}'.`);

ws.ping();
ws.send(JSON.stringify(ping));
pingTimeout = setTimeout(() => {
if (ws) ws.terminate();
}, EXPECTED_PONG_BACK);

}, KEEP_ALIVE_CHECK_INTERVAL);

});

ws.on("pong", () => {
args.log && args.log(`[${new Date().toISOString()}] Health check complete, subscription to '${args.eventName}' is still active.`)
if (pingTimeout) clearInterval(pingTimeout);
});
};
}

const stop = () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/utils/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export async function retry<T>(process: () => Promise<T>, retries: number = 10):
return await process();
} catch (error) {
if (retries === 0) {
console.error(`There was an error retrying a mechanism ${retries} times. Please save this error for troubleshooting.`);
console.error(`There was an error retrying a mechanism 10 times. Please save this error for troubleshooting.`);
throw error;
}
const delay = retries === 1 ? 300000 : Math.random() * (30000 - 5000) + 5000; // Delay for 5 to 30 seconds, but 5 minutes for the last retry
Expand Down

0 comments on commit c82d4fc

Please sign in to comment.