From cd86b5d9dadd1450e13f16f687c9819e641c9a43 Mon Sep 17 00:00:00 2001 From: Ondra Chaloupka Date: Tue, 26 Mar 2024 08:49:45 +0100 Subject: [PATCH] try catch on async --- packages/lib/web3js-common/src/tx.ts | 61 ++++++++++----- packages/lib/web3js-common/src/txBulk.ts | 98 ++++++++++++++++++++---- 2 files changed, 126 insertions(+), 33 deletions(-) diff --git a/packages/lib/web3js-common/src/tx.ts b/packages/lib/web3js-common/src/tx.ts index 78aa1ef..9455040 100644 --- a/packages/lib/web3js-common/src/tx.ts +++ b/packages/lib/web3js-common/src/tx.ts @@ -393,13 +393,18 @@ export async function executeTxWithExceededBlockhashRetry( txParams: ExecuteTxParams ): Promise { try { - return await executeTx(txParams) + logInfo(txParams.logger, 'Executing transaction') + const promise = executeTx(txParams) + promise.catch(e => { + logInfo(txParams.logger, 'Fuck you! Failed transaction execution', e) + }) + return await promise } catch (e) { + const txSig = + e instanceof ExecutionError && e.txSignature !== undefined + ? `${e.txSignature} ` + : '' if (checkErrorMessage(e, 'block height exceeded')) { - const txSig = - e instanceof ExecutionError && e.txSignature !== undefined - ? `${e.txSignature} ` - : '' logDebug( txParams.logger, `Failed to execute transaction ${txSig}` + @@ -409,7 +414,25 @@ export async function executeTxWithExceededBlockhashRetry( ) txParams.transaction.recentBlockhash = undefined return await executeTx(txParams) + } + if (checkErrorMessage(e, 'Too many requests')) { + logInfo(txParams.logger, 'too many requests execution') + logDebug( + txParams.logger, + `Failed to execute transaction ${txSig}` + + 'due too many requests on RPC, retrying, ' + + 'original error: ' + + e + ) + txParams.transaction.recentBlockhash = undefined + await sleep(3_000) + return await executeTx(txParams) } else { + logInfo( + txParams.logger, + 'Failed transaction execution', + (e as Error).message + ) throw e } } @@ -676,7 +699,7 @@ export async function splitAndExecuteTx({ let splitMarkerStartIdx = Number.MAX_SAFE_INTEGER for (let i = 0; i < transaction.instructions.length; i++) { // TODO: delete me! - logInfo(logger, 'processing index: ' + i) + // logInfo(logger, 'processing index: ' + i) const ix = transaction.instructions[i] if (ix instanceof TransactionInstructionSplitMarkerStart) { splitMarkerStartIdx = i @@ -687,7 +710,7 @@ export async function splitAndExecuteTx({ continue } // TODO: delete me! - logInfo(logger, 'not split marker index: ' + i) + // logInfo(logger, 'not split marker index: ' + i) lastValidTransaction.add(ix) const filteredSigners = filterSignersForInstruction( lastValidTransaction.instructions, @@ -727,10 +750,10 @@ export async function splitAndExecuteTx({ addIdx++ ) { // TODO: delete me! - logInfo( - logger, - `Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}` - ) + // logInfo( + // logger, + // `Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}` + // ) if (isSplitMarkerInstruction(transaction.instructions[addIdx])) { continue } @@ -754,20 +777,20 @@ export async function splitAndExecuteTx({ } transactions.push(transactionAdd) // TODO: delete me! - logInfo( - logger, - `transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}` - ) + // logInfo( + // logger, + // `transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}` + // ) // we processed until i minus one; // next outer loop increases i and we need to start from the same instruction // as the current position is i = addIdx - 1 transactionStartIndex = addIdx // TODO: delete me! - logInfo( - logger, - `after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}` - ) + // logInfo( + // logger, + // `after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}` + // ) // nulling data of the next transaction to check lastValidTransaction = await generateNewTransaction({ feePayer: feePayerDefined, diff --git a/packages/lib/web3js-common/src/txBulk.ts b/packages/lib/web3js-common/src/txBulk.ts index 2b72e6c..564ed2d 100644 --- a/packages/lib/web3js-common/src/txBulk.ts +++ b/packages/lib/web3js-common/src/txBulk.ts @@ -4,6 +4,7 @@ import { logDebug, logError, logInfo, + logWarn, } from '@marinade.finance/ts-common' import { Connection, @@ -83,6 +84,7 @@ export async function splitAndBulkExecuteTx({ const numberOfSimulations = numberOfRetries < 5 ? 5 : numberOfRetries for (let i = 1; i <= numberOfSimulations; i++) { try { + logWarn(logger, 'Simulating transactions: ' + i) resultSimulated = await splitAndExecuteTx({ connection, transaction, @@ -97,12 +99,22 @@ export async function splitAndBulkExecuteTx({ computeUnitLimit, computeUnitPrice, }) + logWarn(logger, 'Simulation was successful, proceeding to send') break } catch (e) { if ( i >= numberOfSimulations || - !checkErrorMessage(e, 'Too many requests for a specific RPC call') + !checkErrorMessage(e, 'Too many requests') ) { + logError( + logger, + 'Too many retries for simulation, aborting... ' + + { + i, + numberOfSimulations, + tooMany: checkErrorMessage(e, 'Too many requests'), + } + ) throw e } else { logDebug(logger, `Error to split and execute transactions: ${e}`) @@ -199,16 +211,38 @@ async function bulkSend({ logger, `Bulk #${retryAttempt} sending ${workingTransactions.length} transactions` ) + let processed = 0 const txSendPromises: { promise: Promise; index: number }[] = [] for (const { index, transaction } of workingTransactions) { const promise = connection.sendTransaction(transaction, { skipPreflight: true, ...sendOpts, }) - txSendPromises.push({ index, promise }) + promise + .then(() => { + txSendPromises.push({ index, promise }) + }) + .catch(e => { + rpcErrors.push( + new ExecutionError({ + msg: `Transaction at [${index}] failed to be sent to blockchain`, + cause: e as Error, + transaction: data[index].transaction, + }) + ) + }) + .finally(() => { + processed++ + }) + } + + // --- WAITING FOR ALL TO BE SENT --- + while (processed < workingTransactions.length) { + await new Promise(resolve => setTimeout(resolve, 100)) } // --- CONFIRMING --- + processed = 0 const confirmationPromises: { promise: Promise> index: number @@ -226,17 +260,24 @@ async function bulkSend({ }, confirmOpts ) - confirmationPromises.push({ index, promise }) - promise.catch(e => { - // managing 'Promise rejection was handled asynchronously' error - rpcErrors.push( - new ExecutionError({ - msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`, - cause: e as Error, - transaction: data[index].transaction, - }) - ) - }) + + promise + .then(() => { + confirmationPromises.push({ index, promise }) + }) + .catch(e => { + // managing 'Promise rejection was handled asynchronously' error + rpcErrors.push( + new ExecutionError({ + msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`, + cause: e as Error, + transaction: data[index].transaction, + }) + ) + }) + .finally(() => { + processed++ + }) } catch (e) { rpcErrors.push( new ExecutionError({ @@ -245,10 +286,18 @@ async function bulkSend({ transaction: data[index].transaction, }) ) + + processed++ } } + // --- WAITING FOR ALL TO BE CONFIRMED --- + while (processed < txSendPromises.length) { + await new Promise(resolve => setTimeout(resolve, 100)) + } + // --- GETTING LOGS --- + processed = 0 const responsePromises: { index: number promise: Promise @@ -267,7 +316,22 @@ async function bulkSend({ commitment: confirmOpts, maxSupportedTransactionVersion: 0, }) - responsePromises.push({ index, promise }) + promise + .then(() => { + responsePromises.push({ index, promise }) + }) + .catch(e => { + rpcErrors.push( + new ExecutionError({ + msg: `Transaction at [${index}] failed to be sent to blockchain`, + cause: e as Error, + transaction: data[index].transaction, + }) + ) + }) + .finally(() => { + processed++ + }) } catch (e) { // transaction was not confirmed to be on blockchain // by chance still can be landed but we do not know why we don't care @@ -281,9 +345,15 @@ async function bulkSend({ transaction: data[index].transaction, }) ) + processed++ } } + // --- WAITING FOR ALL LOGS BEING FETCHED --- + while (processed < confirmationPromises.length) { + await new Promise(resolve => setTimeout(resolve, 100)) + } + // --- RETRIEVING LOGS PROMISE AND FINISH --- for (const { index, promise: responsePromise } of responsePromises) { try {