From adb02d36f747852816bd5a99efc83a432b072823 Mon Sep 17 00:00:00 2001 From: Ondra Chaloupka Date: Tue, 26 Mar 2024 08:49:45 +0100 Subject: [PATCH] [web3js] bulk sending fixes and hacks --- packages/lib/cli-common/package.json | 2 +- packages/lib/web3js-common/src/tx.ts | 103 ++++++----- packages/lib/web3js-common/src/txBulk.ts | 222 +++++++++++++++++------ 3 files changed, 230 insertions(+), 97 deletions(-) diff --git a/packages/lib/cli-common/package.json b/packages/lib/cli-common/package.json index 294cea1..f4220f7 100644 --- a/packages/lib/cli-common/package.json +++ b/packages/lib/cli-common/package.json @@ -37,9 +37,9 @@ }, "peerDependencies": { "@solana/web3.js": "^1.91.1", - "@marinade.finance/ledger-utils": "^3.0.1", "@marinade.finance/web3js-common": "2.4.1", "@marinade.finance/ts-common": "2.4.1", + "@marinade.finance/ledger-utils": "^3.0.1", "bn.js": "^5.2.1", "borsh": "^0.7.0", "bs58": "^5.0.0", diff --git a/packages/lib/web3js-common/src/tx.ts b/packages/lib/web3js-common/src/tx.ts index 78aa1ef..a962a12 100644 --- a/packages/lib/web3js-common/src/tx.ts +++ b/packages/lib/web3js-common/src/tx.ts @@ -103,7 +103,7 @@ export function isExecuteTxReturn(data: any): data is ExecuteTxReturn { export async function partialSign( transaction: Transaction | VersionedTransaction, signers: (Wallet | Keypair | Signer)[] -) { +): Promise { for (const signer of signers) { if (instanceOfWallet(signer)) { // partial signing by this call, despite the name @@ -116,6 +116,20 @@ export async function partialSign( } } +export function unhandledRejection( + err: Error, + promise: Promise, + logger?: LoggerPlaceholder +) { + promise.then(() => {}).catch(() => {}) // ignore errors + logDebug(logger, { + msg: 'Unhandled promise rejection', + message: err instanceof Error ? err.message : err, + stack: err.stack || '', + promise, + }) +} + export async function executeTx( args: Omit & { simulate: true } ): Promise @@ -199,9 +213,12 @@ export async function executeTx({ let txSignature: string | undefined = undefined try { if (simulate) { - logDebug(logger, '[[Simulation mode]]') + const handler = (r: Error, p: Promise) => + unhandledRejection(r, p, logger) + process.on('unhandledRejection', handler) + logDebug(logger, 'executeTx: simulation mode') txResponse = (await connection.simulateTransaction(transaction)).value - logDebug(logger, txResponse) + process.off('unhandledRejection', handler) if (txResponse.err) { throw new SendTransactionError( txResponse.err as string, @@ -257,7 +274,9 @@ export async function updateTransactionBlockhash< lastValidBlockHeight: number }> ): Promise { - currentBlockhash = currentBlockhash ?? (await connection.getLatestBlockhash()) + if (currentBlockhash === undefined) { + currentBlockhash = await connection.getLatestBlockhash() + } if (transaction instanceof VersionedTransaction) { transaction.message.recentBlockhash = currentBlockhash.blockhash } else { @@ -325,11 +344,11 @@ export async function confirmTransaction( commitment: confirmFinality, maxSupportedTransactionVersion: 0, // TODO: configurable? }) - const confirmBlockhash = connection.getLatestBlockhash(confirmFinality) + const confirmBlockhash = await connection.getLatestBlockhash(confirmFinality) while ( txRes === null && ( - await connection.isBlockhashValid((await confirmBlockhash).blockhash, { + await connection.isBlockhashValid(confirmBlockhash.blockhash, { commitment: confirmFinality, }) ).value @@ -395,11 +414,11 @@ export async function executeTxWithExceededBlockhashRetry( try { return await executeTx(txParams) } catch (e) { + const txSig = + e instanceof ExecutionError && e.txSignature !== undefined + ? `${e.txSignature} ` + : '' if (checkErrorMessage(e, 'block height exceeded')) { - const txSig = - e instanceof ExecutionError && e.txSignature !== undefined - ? `${e.txSignature} ` - : '' logDebug( txParams.logger, `Failed to execute transaction ${txSig}` + @@ -409,7 +428,24 @@ export async function executeTxWithExceededBlockhashRetry( ) txParams.transaction.recentBlockhash = undefined return await executeTx(txParams) + } + if (checkErrorMessage(e, 'Too many requests')) { + logDebug( + txParams.logger, + `Failed to execute transaction ${txSig}` + + 'due too many requests on RPC, retrying, ' + + 'original error: ' + + e + ) + txParams.transaction.recentBlockhash = undefined + await sleep(3_000) + return await executeTx(txParams) } else { + logDebug( + txParams.logger, + 'Failed transaction execution', + (e as Error).message + ) throw e } } @@ -467,13 +503,13 @@ export function filterSignersForInstruction( return signers.filter(s => signersRequired.find(rs => rs.equals(s.publicKey))) } -async function getTransaction( +function getNewTransaction( feePayer: PublicKey, bh: Readonly<{ blockhash: string lastValidBlockHeight: number }> -): Promise { +): Transaction { return new Transaction({ feePayer, blockhash: bh.blockhash, @@ -481,7 +517,7 @@ async function getTransaction( }) } -async function addComputeBudgetIxes({ +export async function addComputeBudgetIxes({ transaction, computeUnitLimit, computeUnitPrice, @@ -498,7 +534,7 @@ async function addComputeBudgetIxes({ } } -function setComputeUnitLimitIx(units: number): TransactionInstruction { +export function setComputeUnitLimitIx(units: number): TransactionInstruction { return ComputeBudgetProgram.setComputeUnitLimit({ units }) } @@ -506,7 +542,9 @@ function setComputeUnitLimitIx(units: number): TransactionInstruction { * Priority fee that is calculated in micro lamports (0.000001 SOL) * Every declared CU for the transaction is paid with this additional payment. */ -function setComputeUnitPriceIx(microLamports: number): TransactionInstruction { +export function setComputeUnitPriceIx( + microLamports: number +): TransactionInstruction { return ComputeBudgetProgram.setComputeUnitPrice({ microLamports }) } @@ -665,7 +703,7 @@ export async function splitAndExecuteTx({ lastValidBlockHeight: transaction.lastValidBlockHeight, } } - let lastValidTransaction = await generateNewTransaction({ + let lastValidTransaction = generateNewTransaction({ feePayer: feePayerDefined, bh: blockhash, computeUnitLimit, @@ -675,8 +713,6 @@ export async function splitAndExecuteTx({ let transactionStartIndex = 0 let splitMarkerStartIdx = Number.MAX_SAFE_INTEGER for (let i = 0; i < transaction.instructions.length; i++) { - // TODO: delete me! - logInfo(logger, 'processing index: ' + i) const ix = transaction.instructions[i] if (ix instanceof TransactionInstructionSplitMarkerStart) { splitMarkerStartIdx = i @@ -686,8 +722,6 @@ export async function splitAndExecuteTx({ splitMarkerStartIdx = Number.MAX_SAFE_INTEGER continue } - // TODO: delete me! - logInfo(logger, 'not split marker index: ' + i) lastValidTransaction.add(ix) const filteredSigners = filterSignersForInstruction( lastValidTransaction.instructions, @@ -703,7 +737,6 @@ export async function splitAndExecuteTx({ }).byteLength } catch (e) { // ignore - logDebug(logger, 'Transaction size calculation failed: ' + e) } // we tried to add the instruction to lastValidTransaction @@ -714,7 +747,7 @@ export async function splitAndExecuteTx({ ) { // size was elapsed, need to split // need to consider existence of nonPossibleToSplitMarker - const transactionAdd = await generateNewTransaction({ + const transactionAdd = generateNewTransaction({ feePayer: feePayerDefined, bh: blockhash, computeUnitLimit, @@ -726,11 +759,6 @@ export async function splitAndExecuteTx({ addIdx < i && addIdx <= splitMarkerStartIdx; addIdx++ ) { - // TODO: delete me! - logInfo( - logger, - `Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}` - ) if (isSplitMarkerInstruction(transaction.instructions[addIdx])) { continue } @@ -753,23 +781,13 @@ export async function splitAndExecuteTx({ ) } transactions.push(transactionAdd) - // TODO: delete me! - logInfo( - logger, - `transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}` - ) // we processed until i minus one; // next outer loop increases i and we need to start from the same instruction // as the current position is i = addIdx - 1 transactionStartIndex = addIdx - // TODO: delete me! - logInfo( - logger, - `after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}` - ) // nulling data of the next transaction to check - lastValidTransaction = await generateNewTransaction({ + lastValidTransaction = generateNewTransaction({ feePayer: feePayerDefined, bh: blockhash, computeUnitLimit, @@ -810,7 +828,8 @@ export async function splitAndExecuteTx({ ? executeResult?.signature : undefined }] ` + - `${executionCounter}/${transactions.length} (${transaction.instructions.length} instructions) executed` + `${executionCounter}(${transaction.instructions.length} ixes)/${transactions.length} ` + + (simulate ? 'simulated' : 'executed') ) if (executeResult !== undefined) { @@ -827,7 +846,7 @@ export async function splitAndExecuteTx({ return result } -async function generateNewTransaction({ +function generateNewTransaction({ feePayer, bh, computeUnitLimit, @@ -840,8 +859,8 @@ async function generateNewTransaction({ }> computeUnitLimit?: number computeUnitPrice?: number -}): Promise { - const transaction = await getTransaction(feePayer, bh) +}): Transaction { + const transaction = getNewTransaction(feePayer, bh) addComputeBudgetIxes({ transaction, computeUnitLimit, diff --git a/packages/lib/web3js-common/src/txBulk.ts b/packages/lib/web3js-common/src/txBulk.ts index 2b72e6c..3473b31 100644 --- a/packages/lib/web3js-common/src/txBulk.ts +++ b/packages/lib/web3js-common/src/txBulk.ts @@ -1,6 +1,5 @@ import { LoggerPlaceholder, - checkErrorMessage, logDebug, logError, logInfo, @@ -21,7 +20,9 @@ import { ExecuteTxReturnSimulated, TransactionData, partialSign, + setComputeUnitLimitIx, splitAndExecuteTx, + unhandledRejection, } from './tx' import { instanceOfProvider } from './provider' import { ExecutionError } from './error' @@ -79,36 +80,22 @@ export async function splitAndBulkExecuteTx({ ? connection.connection : connection - let resultSimulated: BulkExecuteTxSimulatedReturn[] = [] - const numberOfSimulations = numberOfRetries < 5 ? 5 : numberOfRetries - for (let i = 1; i <= numberOfSimulations; i++) { - try { - resultSimulated = await splitAndExecuteTx({ - connection, - transaction, - errMessage, - signers, - feePayer, - simulate: true, - printOnly, - logger, - sendOpts, - confirmOpts, - computeUnitLimit, - computeUnitPrice, - }) - break - } catch (e) { - if ( - i >= numberOfSimulations || - !checkErrorMessage(e, 'Too many requests for a specific RPC call') - ) { - throw e - } else { - logDebug(logger, `Error to split and execute transactions: ${e}`) - } - } - } + const resultSimulated: BulkExecuteTxSimulatedReturn[] = + await splitAndExecuteTx({ + connection, + transaction, + errMessage, + signers, + feePayer, + simulate: true, + printOnly, + logger, + sendOpts, + confirmOpts, + computeUnitLimit, + computeUnitPrice, + }) + if (printOnly || simulate) { return resultSimulated } @@ -117,11 +104,17 @@ export async function splitAndBulkExecuteTx({ const currentBlockhash = await connection.getLatestBlockhash() const resultExecuted: BulkExecuteTxExecutedReturn[] = resultSimulated.map( r => { + const instructions = [] + if (r.response?.unitsConsumed) { + const computeUnitPrice = Math.floor(r.response.unitsConsumed * 1.2) + instructions.push(setComputeUnitLimitIx(computeUnitPrice)) + } + instructions.push(...r.transaction.instructions) const messageV0 = new TransactionMessage({ payerKey: r.transaction.feePayer ?? r.signers[0].publicKey, recentBlockhash: r.transaction.recentBlockhash ?? currentBlockhash.blockhash, - instructions: r.transaction.instructions, + instructions, }).compileToV0Message() const transaction = new VersionedTransaction(messageV0) return { @@ -133,20 +126,36 @@ export async function splitAndBulkExecuteTx({ } ) + // unhandled rejections handler; this is a strange trouble + // that no catch patterns work with web3.js(?) + const handler = (r: Error, p: Promise) => + unhandledRejection(r, p, logger) + process.on('unhandledRejection', handler) + let failures: ExecutionError[] = [] // let's send to land the transaction on blockchain const numberOfSends = numberOfRetries + 1 for (let i = 1; i <= numberOfSends; i++) { - ;({ failures } = await bulkSend({ - connection, - logger, - sendOpts, - confirmOpts, - data: resultExecuted, - retryAttempt: i, - })) - if (failures.length === 0) { - break + try { + ;({ failures } = await bulkSend({ + connection, + logger, + sendOpts, + confirmOpts, + data: resultExecuted, + retryAttempt: i, + })) + if (failures.length === 0) { + break + } + } catch (e) { + logDebug(logger, `Error on bulkSend at attempt #${i} : ${e}`) + failures.push( + new ExecutionError({ + msg: `Error on bulkSend at attempt #${i}`, + cause: e as Error, + }) + ) } } if (failures.length > 0) { @@ -159,6 +168,8 @@ export async function splitAndBulkExecuteTx({ ) } + process.off('unhandledRejection', handler) + return resultExecuted } @@ -199,6 +210,7 @@ async function bulkSend({ logger, `Bulk #${retryAttempt} sending ${workingTransactions.length} transactions` ) + const rpcErrors: ExecutionError[] = [] const txSendPromises: { promise: Promise; index: number }[] = [] for (const { index, transaction } of workingTransactions) { const promise = connection.sendTransaction(transaction, { @@ -206,15 +218,45 @@ async function bulkSend({ ...sendOpts, }) txSendPromises.push({ index, promise }) + // promise + // .then(() => { + // txSendPromises.push({ index, promise }) + // logInfo(logger, `Transaction at [${index}] sent to blockchain`) + // }) + // .catch(e => { + // logInfo(logger, `Transaction at [${index}] failed to send ` + e.message) + // rpcErrors.push( + // new ExecutionError({ + // msg: `Transaction at [${index}] failed to be sent to blockchain`, + // cause: e as Error, + // transaction: data[index].transaction, + // }) + // ) + // }) + // .finally(() => { + // processed++ + // logInfo( + // logger, + // `Transaction at [${index}] sent ${processed}/${workingTransactions.length}` + // ) + // }) } + // --- WAITING FOR ALL TO BE SENT --- + // trying to avoid unhandled rejection but it does not work as expected(?) + // https://jakearchibald.com/2023/unhandled-rejections/ + await Promise.allSettled(txSendPromises.map(r => r.promise)) + logDebug( + logger, + `Confirming bulk #${retryAttempt}/` + + `${workingTransactions.length} [${data.length}]` + ) // --- CONFIRMING --- const confirmationPromises: { promise: Promise> index: number }[] = [] - const rpcErrors: ExecutionError[] = [] - for (const { index, promise: signaturePromise } of txSendPromises) { + for await (const { index, promise: signaturePromise } of txSendPromises) { try { const signature = await signaturePromise data[index].signature = signature @@ -227,16 +269,31 @@ async function bulkSend({ confirmOpts ) confirmationPromises.push({ index, promise }) - promise.catch(e => { - // managing 'Promise rejection was handled asynchronously' error - rpcErrors.push( - new ExecutionError({ - msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`, - cause: e as Error, - transaction: data[index].transaction, - }) - ) - }) + // promise + // .then(() => { + // confirmationPromises.push({ index, promise }) + // logInfo(logger, `Transaction at [${index}] confirmed`) + // }) + // .catch(e => { + // logInfo( + // logger, + // `Transaction at [${index}] failed to be confirmed ` + e.message + // ) + // // managing 'Promise rejection was handled asynchronously' error + // rpcErrors.push( + // new ExecutionError({ + // msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`, + // cause: e as Error, + // transaction: data[index].transaction, + // }) + // ) + // }) + // .finally(() => { + // logInfo( + // logger, + // `Transaction at [${index}] confirmed ${processed}/${txSendPromises.length}` + // ) + // }) } catch (e) { rpcErrors.push( new ExecutionError({ @@ -248,6 +305,12 @@ async function bulkSend({ } } + await Promise.allSettled(txSendPromises.map(r => r.promise)) + logDebug( + logger, + `Getting logs bulk #${retryAttempt}/` + + `${txSendPromises.length} [${data.length}]` + ) // --- GETTING LOGS --- const responsePromises: { index: number @@ -268,6 +331,35 @@ async function bulkSend({ maxSupportedTransactionVersion: 0, }) responsePromises.push({ index, promise }) + // promise + // .then(r => { + // responsePromises.push({ index, promise }) + // logInfo( + // logger, + // `Transaction at [${index}] fetched ` + r?.meta?.computeUnitsConsumed + // ) + // }) + // .catch(e => { + // logInfo( + // logger, + // `Transaction at [${index}] failed to be fetched ` + e.message + // ) + // data[index].confirmationError = e as Error + // responsePromises.push({ index, promise: Promise.resolve(null) }) + // rpcErrors.push( + // new ExecutionError({ + // msg: `Transaction at [${index}] failed to be sent to blockchain`, + // cause: e as Error, + // transaction: data[index].transaction, + // }) + // ) + // }) + // .finally(() => { + // logInfo( + // logger, + // `Transaction at [${index}] fetched ${processed}/${confirmationPromises.length}` + // ) + // }) } catch (e) { // transaction was not confirmed to be on blockchain // by chance still can be landed but we do not know why we don't care @@ -284,9 +376,31 @@ async function bulkSend({ } } + // --- WAITING FOR ALL LOGS BEING FETCHED --- + await Promise.allSettled(responsePromises.map(r => r.promise)) + logDebug( + logger, + `Retrieving logs bulk #${retryAttempt}/` + + `${confirmationPromises.length} [${data.length}]` + ) // --- RETRIEVING LOGS PROMISE AND FINISH --- for (const { index, promise: responsePromise } of responsePromises) { try { + // TODO: delete me! + // responsePromise.catch(e => { + // logInfo( + // logger, + // `Transaction at [${index}] failed to retrieve ` + (e as Error).message + // ) + // rpcErrors.push( + // new ExecutionError({ + // msg: `Transaction ${data[index].signature} at [${index}] failed to be found on-chain`, + // cause: e as Error, + // transaction: data[index].transaction, + // logs: data[index].response?.meta?.logMessages || undefined, + // }) + // ) + // }) const awaitedResponse = await responsePromise if (awaitedResponse !== null) { data[index].response = awaitedResponse