Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

fix: implement backoff retry policy for websocket handler #3600

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added retry mechanism considering in flight requests
  • Loading branch information
satyajeetkolhapure committed Dec 12, 2022
commit 926e2137447d76db4ee1ca344e1f256edf1ace17
127 changes: 92 additions & 35 deletions src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts
Original file line number Diff line number Diff line change
@@ -9,6 +9,11 @@ import { JsonRpcResponse, JsonRpcError } from "@ganache/utils";

const { JSONRPC_PREFIX } = BaseHandler;

export type RetryConfiguration = {
retryIntervalBaseInSeconds: number;
retryCounter: number;
};

export class WsHandler extends BaseHandler implements Handler {
private open: Promise<unknown>;
private connection: WebSocket;
@@ -20,45 +25,80 @@ export class WsHandler extends BaseHandler implements Handler {
}>
>();

// queue requests when connection is closed.
private delayedRequestsQueue = [];
// flag to identify if adhoc reconnection attempt.
private adhocReconnectionRequest = false;

// retry configuration
private retryIntervalBase: number = 2;
private retryCounter: number = 3;
private initialRetryCounter = this.retryCounter;
private retryIntervalBaseInSeconds: number = 2;
private initialRetryCounter: number;
private retryTimeoutId: NodeJS.Timeout;

constructor(options: EthereumInternalOptions, abortSignal: AbortSignal) {
// socket configuration
private url: string;
private origin: string;
private logging: EthereumInternalOptions["logging"];

constructor(
options: EthereumInternalOptions,
abortSignal: AbortSignal,
retryConfiguration?: RetryConfiguration | undefined
) {
super(options, abortSignal);

const {
fork: { url, origin },
logging
} = options;
this.url = url.toString();
this.origin = origin;
this.logging = logging;

// set retry configuration values
if (retryConfiguration) {
this.retryCounter = retryConfiguration.retryCounter;
this.initialRetryCounter = retryConfiguration.retryIntervalBaseInSeconds;
}
this.initialRetryCounter = this.retryCounter;

this.open = this.connect(url.toString(), origin, logging);
this.connection.onclose = () => {
const onCloseEvent = () => {
// try to connect again...
// backoff and eventually fail
if( this.retryCounter > 0 ) {
clearTimeout( this.retryTimeoutId );
this.retryTimeoutId = setTimeout( () => {
this.reconnect(url.toString(), origin, logging);
}, Math.pow( this.retryIntervalBase, this.initialRetryCounter - this.retryCounter ) * 1000 );
this.retryCounter--;
}
// do not schedule reconnection for adhoc reconnection requests
if (this.retryCounter === 0) {
throw new Error("Connection to Infura has failed. Try again");
} else {
if (!this.adhocReconnectionRequest) {
this.retryCounter--;
clearTimeout(this.retryTimeoutId);
this.retryTimeoutId = setTimeout(async () => {
this.reconnect(this.url, this.origin, false);
}, Math.pow(this.retryIntervalBaseInSeconds, this.initialRetryCounter - this.retryCounter) * 1000);
}
}
};
this.open = this.connect(this.url, this.origin, onCloseEvent);
this.abortSignal.addEventListener("abort", () => {
this.connection.onclose = null;
this.connection.close(1000);
});
this.connection.onmessage = this.onMessage.bind(this);
}

public async request<T>(
method: string,
params: unknown[],
options = { disableCache: false }
) {
await this.open;
try {
await this.open;
} catch (er) {
this.logging.logger.log("Connection to Infura has failed");
// skip the reconnection if connection is being made
if (this.connection.readyState !== this.connection.CONNECTING)
this.reconnect(this.url, this.origin, true);
}
if (this.abortSignal.aborted) return Promise.reject(new AbortError());

const key = JSON.stringify({ method, params });
@@ -76,7 +116,13 @@ export class WsHandler extends BaseHandler implements Handler {
// Issue: https://github.com/trufflesuite/ganache/issues/3478
this.inFlightRequests.set(messageId, deferred);

this.connection.send(`${JSONRPC_PREFIX}${messageId},${key.slice(1)}`);
// if connection is alive send request else delay the request
const data = `${JSONRPC_PREFIX}${messageId},${key.slice(1)}`;
if (this.connection && this.connection.readyState === 1) {
this.connection.send(data);
} else {
this.delayRequest(data);
}
return deferred.promise.finally(() => this.requestCache.delete(key));
};
return await this.queueRequest<T>(method, params, key, send, options);
@@ -100,11 +146,7 @@ export class WsHandler extends BaseHandler implements Handler {
}
}

private connect(
url: string,
origin: string,
logging: EthereumInternalOptions["logging"]
) {
private connect(url: string, origin: string, onCloseEvent: any) {
this.connection = new WebSocket(url, {
origin,
headers: this.headers
@@ -119,30 +161,45 @@ export class WsHandler extends BaseHandler implements Handler {
// If you need to change this, you probably need to change our `onMessage`
// handler too.
this.connection.binaryType = "nodebuffer";
this.connection.onclose = onCloseEvent;
this.connection.onmessage = this.onMessage.bind(this);
let open = new Promise((resolve, reject) => {
this.connection.onopen = resolve;
this.connection.onerror = reject;
});
open.then(
() => {
this.connection.onopen = null;
this.connection.onerror = null;
// reset the retry counter
this.retryCounter = this.initialRetryCounter;
},
err => {
logging.logger.log(err);
}
);
open.then(() => {
this.connection.onopen = null;
this.connection.onerror = null;
// reset the retry counter and any timeouts scheduled for retries
this.retryCounter = this.initialRetryCounter;
clearTimeout(this.retryTimeoutId);

this.adhocReconnectionRequest = false;
// process delayed requests which were queued at the time of connection failure
this.sendDelayedRequests();
});
return open;
}

private reconnect (url: string,
private reconnect(
url: string,
origin: string,
logging: EthereumInternalOptions["logging"]) {
adhocReconnectionRequest: boolean = false
) {
this.adhocReconnectionRequest = adhocReconnectionRequest;
const onCloseEvent = this.connection.onclose;
this.open = this.connect(url, origin, logging);
this.connection.onclose = onCloseEvent;
this.open = this.connect(url, origin, onCloseEvent);
}

private delayRequest(request: any) {
this.delayedRequestsQueue.push(request);
}

private sendDelayedRequests() {
while (this.delayedRequestsQueue.length > 0) {
const request = this.delayedRequestsQueue.pop();
this.connection.send(request);
}
}

public async close() {
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import assert from "assert";
import AbortController from "abort-controller";
import { WsHandler } from "../../../src/forking/handlers/ws-handler";
import {
EthereumOptionsConfig,
EthereumProviderOptions
} from "@ganache/ethereum-options";
import WebSocket from "ws";

const createWebSocketServer = (port: number): WebSocket.Server => {
let wsServer = new WebSocket.Server({ port });
wsServer.on("connection", async ws => {
ws.on("message", data => {
const message = JSON.parse(data.toString());
ws.send(
Buffer.from(
JSON.stringify({
id: message.id,
jsonrpc: "2.0",
result: "0x0"
}),
"utf-8"
)
);
if (message.method === "client-disconnect") {
setTimeout(() => {
ws.terminate();
}, 10);
}
});
});
return wsServer;
};

// create test server
const URL = "ws://localhost:1001/";
let wsServer: WebSocket.Server;
let wsHandler: WsHandler;
wsServer = createWebSocketServer(1001);

describe("ws-handler", function () {
describe("retries", function () {
before(() => {
const providerOptions = EthereumOptionsConfig.normalize({
fork: {
url: URL,
origin: "test"
}
} as EthereumProviderOptions);
const abortController: AbortController = new AbortController();
wsHandler = new WsHandler(providerOptions, abortController.signal, {
retryCounter: 4,
retryIntervalBaseInSeconds: 3
});
});

after(() => {
wsHandler.close();
wsServer.close();
});

it("should attempt to reconnect the server when connection is terminated", async () => {
// send a request to websocket server to get connection termination.
await wsHandler.request<any>("client-disconnect", [], {
disableCache: true
});
await new Promise(resolve => setTimeout(resolve, 100));

// send request after connection is terminated
const retryPromise = wsHandler.request<any>("retry", [], {
disableCache: true
});

// assert the result
const response = await retryPromise;
assert.equal(response, "0x0");
}).timeout(10000);
});
});