diff --git a/src/network-handlers/ripple-handler.ts b/src/network-handlers/ripple-handler.ts index a1c5f65..1270508 100644 --- a/src/network-handlers/ripple-handler.ts +++ b/src/network-handlers/ripple-handler.ts @@ -1,11 +1,5 @@ import { Decimal } from 'decimal.js'; import { BigNumber } from 'ethers'; -import { - AutoFillValues, - MultisignatureTransactionResponse, - SignResponse, - XRPLSignatures, -} from 'src/models/ripple.model.js'; import xrpl, { AccountNFTsRequest, AccountObject, @@ -29,6 +23,12 @@ import { } from '../functions/ripple/ripple.functions.js'; import { RippleError } from '../models/errors.js'; import { RawVault, SSFVaultUpdate, SSPVaultUpdate } from '../models/ethereum-models.js'; +import { + AutoFillValues, + MultisignatureTransactionResponse, + SignResponse, + XRPLSignatures, +} from '../models/ripple.model.js'; import { shiftValue, unshiftValue } from '../utilities/index.js'; function buildDefaultNftVault(): RawVault { @@ -49,6 +49,26 @@ function buildDefaultNftVault(): RawVault { taprootPubKey: '0'.repeat(64), }; } +class AsyncLock { + private locks: Map> = new Map(); + + async acquire(key: string, fn: () => Promise): Promise { + while (this.locks.has(key)) { + await this.locks.get(key); + } + + let resolve: () => void; + const promise = new Promise(r => (resolve = r)); + this.locks.set(key, promise); + + try { + return await fn(); + } finally { + this.locks.delete(key); + resolve!(); + } + } +} export class RippleHandler { private static _instance: RippleHandler | null = null; @@ -58,6 +78,10 @@ export class RippleHandler { private minSigners: number | null = null; private initialized: boolean = false; + private connectionPromise: Promise | null = null; + private activeRequests: number = 0; + private readonly connectionLock = new AsyncLock(); + private constructor() {} public static getInstance(): RippleHandler { @@ -107,28 +131,64 @@ export class RippleHandler { return instance; } + private async acquireConnection(): Promise { + return this.connectionLock.acquire('connection', async () => { + this.activeRequests++; + + if (!this.client!.isConnected() && !this.connectionPromise) { + this.connectionPromise = connectRippleClient(this.client!); + } + + if (this.connectionPromise) { + await this.connectionPromise; + } + }); + } + + private async releaseConnection(): Promise { + return this.connectionLock.acquire('connection', async () => { + this.activeRequests--; + + if (this.activeRequests === 0 && this.client!.isConnected()) { + await this.client!.disconnect(); + this.connectionPromise = null; + } + }); + } + async withConnectionMgmt(callback: () => Promise): Promise { this.checkInitialized(); - console.log('Connecting to the async service...'); - const newConnection = !this.client!.isConnected(); + try { - await connectRippleClient(this.client!); - console.log('calling the callback service...'); - const result = await callback(); - return result; - } catch (error) { - throw new RippleError(`Error while executing XRPL function: ${error}`); + await this.acquireConnection(); + return await callback(); } finally { - console.log('Disconnecting from the async service...'); - if (newConnection) { - // only disconnect if we connected in this function, otherwise leave the connection open - // This is to prevent closing a connection from an internally used function when the connection is still needed by the caller - // For example, getSigUpdateVaultForSSP calls getRawVault internally, and both need the connection, so we can't close the connection when getRawVault finishes - await this.client!.disconnect(); - } + await this.releaseConnection(); } } + // async withConnectionMgmt(callback: () => Promise): Promise { + // this.checkInitialized(); + // console.log('Connecting to the async service...'); + // const newConnection = !this.client!.isConnected(); + // try { + // await connectRippleClient(this.client!); + // console.log('calling the callback service...'); + // const result = await callback(); + // return result; + // } catch (error) { + // throw new RippleError(`Error while executing XRPL function: ${error}`); + // } finally { + // console.log('Disconnecting from the async service...'); + // if (newConnection) { + // // only disconnect if we connected in this function, otherwise leave the connection open + // // This is to prevent closing a connection from an internally used function when the connection is still needed by the caller + // // For example, getSigUpdateVaultForSSP calls getRawVault internally, and both need the connection, so we can't close the connection when getRawVault finishes + // await this.client!.disconnect(); + // } + // } + // } + async submit(xrplSignatures: XRPLSignatures[]): Promise { this.checkInitialized(); return await this.withConnectionMgmt(async () => {