Skip to content

Commit

Permalink
fix: add async lock for ws connection
Browse files Browse the repository at this point in the history
  • Loading branch information
scolear committed Nov 4, 2024
1 parent 5ee237d commit f1abdc3
Showing 1 changed file with 81 additions and 21 deletions.
102 changes: 81 additions & 21 deletions src/network-handlers/ripple-handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { Decimal } from 'decimal.js';
import { BigNumber } from 'ethers';
import {
AutoFillValues,
MultisignatureTransactionResponse,
SignResponse,
XRPLSignatures,
} from 'src/models/ripple.model.js';
import xrpl, {
AccountNFTsRequest,
AccountObject,
Expand All @@ -29,6 +23,12 @@ import {
} from '../functions/ripple/ripple.functions.js';
import { RippleError } from '../models/errors.js';
import { RawVault, SSFVaultUpdate, SSPVaultUpdate } from '../models/ethereum-models.js';
import {
AutoFillValues,
MultisignatureTransactionResponse,
SignResponse,
XRPLSignatures,
} from '../models/ripple.model.js';
import { shiftValue, unshiftValue } from '../utilities/index.js';

function buildDefaultNftVault(): RawVault {
Expand All @@ -49,6 +49,26 @@ function buildDefaultNftVault(): RawVault {
taprootPubKey: '0'.repeat(64),
};
}
class AsyncLock {
private locks: Map<string, Promise<void>> = new Map();

async acquire<T>(key: string, fn: () => Promise<T>): Promise<T> {
while (this.locks.has(key)) {
await this.locks.get(key);
}

let resolve: () => void;
const promise = new Promise<void>(r => (resolve = r));
this.locks.set(key, promise);

try {
return await fn();
} finally {
this.locks.delete(key);
resolve!();
}
}
}

export class RippleHandler {
private static _instance: RippleHandler | null = null;
Expand All @@ -58,6 +78,10 @@ export class RippleHandler {
private minSigners: number | null = null;
private initialized: boolean = false;

private connectionPromise: Promise<boolean> | null = null;
private activeRequests: number = 0;
private readonly connectionLock = new AsyncLock();

private constructor() {}

public static getInstance(): RippleHandler {
Expand Down Expand Up @@ -107,28 +131,64 @@ export class RippleHandler {
return instance;
}

private async acquireConnection(): Promise<void> {
return this.connectionLock.acquire('connection', async () => {
this.activeRequests++;

if (!this.client!.isConnected() && !this.connectionPromise) {
this.connectionPromise = connectRippleClient(this.client!);
}

if (this.connectionPromise) {
await this.connectionPromise;
}
});
}

private async releaseConnection(): Promise<void> {
return this.connectionLock.acquire('connection', async () => {
this.activeRequests--;

if (this.activeRequests === 0 && this.client!.isConnected()) {
await this.client!.disconnect();
this.connectionPromise = null;
}
});
}

async withConnectionMgmt<T>(callback: () => Promise<T>): Promise<T> {
this.checkInitialized();
console.log('Connecting to the async service...');
const newConnection = !this.client!.isConnected();

try {
await connectRippleClient(this.client!);
console.log('calling the callback service...');
const result = await callback();
return result;
} catch (error) {
throw new RippleError(`Error while executing XRPL function: ${error}`);
await this.acquireConnection();
return await callback();
} finally {
console.log('Disconnecting from the async service...');
if (newConnection) {
// only disconnect if we connected in this function, otherwise leave the connection open
// This is to prevent closing a connection from an internally used function when the connection is still needed by the caller
// For example, getSigUpdateVaultForSSP calls getRawVault internally, and both need the connection, so we can't close the connection when getRawVault finishes
await this.client!.disconnect();
}
await this.releaseConnection();
}
}

// async withConnectionMgmt<T>(callback: () => Promise<T>): Promise<T> {
// this.checkInitialized();
// console.log('Connecting to the async service...');
// const newConnection = !this.client!.isConnected();
// try {
// await connectRippleClient(this.client!);
// console.log('calling the callback service...');
// const result = await callback();
// return result;
// } catch (error) {
// throw new RippleError(`Error while executing XRPL function: ${error}`);
// } finally {
// console.log('Disconnecting from the async service...');
// if (newConnection) {
// // only disconnect if we connected in this function, otherwise leave the connection open
// // This is to prevent closing a connection from an internally used function when the connection is still needed by the caller
// // For example, getSigUpdateVaultForSSP calls getRawVault internally, and both need the connection, so we can't close the connection when getRawVault finishes
// await this.client!.disconnect();
// }
// }
// }

async submit(xrplSignatures: XRPLSignatures[]): Promise<string> {
this.checkInitialized();
return await this.withConnectionMgmt(async () => {
Expand Down

0 comments on commit f1abdc3

Please sign in to comment.