Skip to content

Commit

Permalink
[web3js] bulk sending fixes and hacks
Browse files Browse the repository at this point in the history
  • Loading branch information
ochaloup committed Mar 27, 2024
1 parent a97ab36 commit adb02d3
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 97 deletions.
2 changes: 1 addition & 1 deletion packages/lib/cli-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
},
"peerDependencies": {
"@solana/web3.js": "^1.91.1",
"@marinade.finance/ledger-utils": "^3.0.1",
"@marinade.finance/web3js-common": "2.4.1",
"@marinade.finance/ts-common": "2.4.1",
"@marinade.finance/ledger-utils": "^3.0.1",
"bn.js": "^5.2.1",
"borsh": "^0.7.0",
"bs58": "^5.0.0",
Expand Down
103 changes: 61 additions & 42 deletions packages/lib/web3js-common/src/tx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export function isExecuteTxReturn(data: any): data is ExecuteTxReturn {
export async function partialSign(
transaction: Transaction | VersionedTransaction,
signers: (Wallet | Keypair | Signer)[]
) {
): Promise<void> {
for (const signer of signers) {
if (instanceOfWallet(signer)) {
// partial signing by this call, despite the name
Expand All @@ -116,6 +116,20 @@ export async function partialSign(
}
}

export function unhandledRejection(
err: Error,
promise: Promise<unknown>,
logger?: LoggerPlaceholder
) {
promise.then(() => {}).catch(() => {}) // ignore errors
logDebug(logger, {
msg: 'Unhandled promise rejection',
message: err instanceof Error ? err.message : err,
stack: err.stack || '',
promise,
})
}

export async function executeTx(
args: Omit<ExecuteTxParams, 'simulate'> & { simulate: true }
): Promise<ExecuteTxReturnSimulated>
Expand Down Expand Up @@ -199,9 +213,12 @@ export async function executeTx({
let txSignature: string | undefined = undefined
try {
if (simulate) {
logDebug(logger, '[[Simulation mode]]')
const handler = (r: Error, p: Promise<unknown>) =>
unhandledRejection(r, p, logger)
process.on('unhandledRejection', handler)
logDebug(logger, 'executeTx: simulation mode')
txResponse = (await connection.simulateTransaction(transaction)).value
logDebug(logger, txResponse)
process.off('unhandledRejection', handler)
if (txResponse.err) {
throw new SendTransactionError(
txResponse.err as string,
Expand Down Expand Up @@ -257,7 +274,9 @@ export async function updateTransactionBlockhash<
lastValidBlockHeight: number
}>
): Promise<T> {
currentBlockhash = currentBlockhash ?? (await connection.getLatestBlockhash())
if (currentBlockhash === undefined) {
currentBlockhash = await connection.getLatestBlockhash()
}
if (transaction instanceof VersionedTransaction) {
transaction.message.recentBlockhash = currentBlockhash.blockhash
} else {
Expand Down Expand Up @@ -325,11 +344,11 @@ export async function confirmTransaction(
commitment: confirmFinality,
maxSupportedTransactionVersion: 0, // TODO: configurable?
})
const confirmBlockhash = connection.getLatestBlockhash(confirmFinality)
const confirmBlockhash = await connection.getLatestBlockhash(confirmFinality)
while (
txRes === null &&
(
await connection.isBlockhashValid((await confirmBlockhash).blockhash, {
await connection.isBlockhashValid(confirmBlockhash.blockhash, {
commitment: confirmFinality,
})
).value
Expand Down Expand Up @@ -395,11 +414,11 @@ export async function executeTxWithExceededBlockhashRetry(
try {
return await executeTx(txParams)
} catch (e) {
const txSig =
e instanceof ExecutionError && e.txSignature !== undefined
? `${e.txSignature} `
: ''
if (checkErrorMessage(e, 'block height exceeded')) {
const txSig =
e instanceof ExecutionError && e.txSignature !== undefined
? `${e.txSignature} `
: ''
logDebug(
txParams.logger,
`Failed to execute transaction ${txSig}` +
Expand All @@ -409,7 +428,24 @@ export async function executeTxWithExceededBlockhashRetry(
)
txParams.transaction.recentBlockhash = undefined
return await executeTx(txParams)
}
if (checkErrorMessage(e, 'Too many requests')) {
logDebug(
txParams.logger,
`Failed to execute transaction ${txSig}` +
'due too many requests on RPC, retrying, ' +
'original error: ' +
e
)
txParams.transaction.recentBlockhash = undefined
await sleep(3_000)
return await executeTx(txParams)
} else {
logDebug(
txParams.logger,
'Failed transaction execution',
(e as Error).message
)
throw e
}
}
Expand Down Expand Up @@ -467,21 +503,21 @@ export function filterSignersForInstruction(
return signers.filter(s => signersRequired.find(rs => rs.equals(s.publicKey)))
}

async function getTransaction(
function getNewTransaction(
feePayer: PublicKey,
bh: Readonly<{
blockhash: string
lastValidBlockHeight: number
}>
): Promise<Transaction> {
): Transaction {
return new Transaction({
feePayer,
blockhash: bh.blockhash,
lastValidBlockHeight: bh.lastValidBlockHeight,
})
}

async function addComputeBudgetIxes({
export async function addComputeBudgetIxes({
transaction,
computeUnitLimit,
computeUnitPrice,
Expand All @@ -498,15 +534,17 @@ async function addComputeBudgetIxes({
}
}

function setComputeUnitLimitIx(units: number): TransactionInstruction {
export function setComputeUnitLimitIx(units: number): TransactionInstruction {
return ComputeBudgetProgram.setComputeUnitLimit({ units })
}

/**
* Priority fee that is calculated in micro lamports (0.000001 SOL)
* Every declared CU for the transaction is paid with this additional payment.
*/
function setComputeUnitPriceIx(microLamports: number): TransactionInstruction {
export function setComputeUnitPriceIx(
microLamports: number
): TransactionInstruction {
return ComputeBudgetProgram.setComputeUnitPrice({ microLamports })
}

Expand Down Expand Up @@ -665,7 +703,7 @@ export async function splitAndExecuteTx({
lastValidBlockHeight: transaction.lastValidBlockHeight,
}
}
let lastValidTransaction = await generateNewTransaction({
let lastValidTransaction = generateNewTransaction({
feePayer: feePayerDefined,
bh: blockhash,
computeUnitLimit,
Expand All @@ -675,8 +713,6 @@ export async function splitAndExecuteTx({
let transactionStartIndex = 0
let splitMarkerStartIdx = Number.MAX_SAFE_INTEGER
for (let i = 0; i < transaction.instructions.length; i++) {
// TODO: delete me!
logInfo(logger, 'processing index: ' + i)
const ix = transaction.instructions[i]
if (ix instanceof TransactionInstructionSplitMarkerStart) {
splitMarkerStartIdx = i
Expand All @@ -686,8 +722,6 @@ export async function splitAndExecuteTx({
splitMarkerStartIdx = Number.MAX_SAFE_INTEGER
continue
}
// TODO: delete me!
logInfo(logger, 'not split marker index: ' + i)
lastValidTransaction.add(ix)
const filteredSigners = filterSignersForInstruction(
lastValidTransaction.instructions,
Expand All @@ -703,7 +737,6 @@ export async function splitAndExecuteTx({
}).byteLength
} catch (e) {
// ignore
logDebug(logger, 'Transaction size calculation failed: ' + e)
}

// we tried to add the instruction to lastValidTransaction
Expand All @@ -714,7 +747,7 @@ export async function splitAndExecuteTx({
) {
// size was elapsed, need to split
// need to consider existence of nonPossibleToSplitMarker
const transactionAdd = await generateNewTransaction({
const transactionAdd = generateNewTransaction({
feePayer: feePayerDefined,
bh: blockhash,
computeUnitLimit,
Expand All @@ -726,11 +759,6 @@ export async function splitAndExecuteTx({
addIdx < i && addIdx <= splitMarkerStartIdx;
addIdx++
) {
// TODO: delete me!
logInfo(
logger,
`Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}`
)
if (isSplitMarkerInstruction(transaction.instructions[addIdx])) {
continue
}
Expand All @@ -753,23 +781,13 @@ export async function splitAndExecuteTx({
)
}
transactions.push(transactionAdd)
// TODO: delete me!
logInfo(
logger,
`transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}`
)
// we processed until i minus one;
// next outer loop increases i and we need to start from the same instruction
// as the current position is
i = addIdx - 1
transactionStartIndex = addIdx
// TODO: delete me!
logInfo(
logger,
`after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}`
)
// nulling data of the next transaction to check
lastValidTransaction = await generateNewTransaction({
lastValidTransaction = generateNewTransaction({
feePayer: feePayerDefined,
bh: blockhash,
computeUnitLimit,
Expand Down Expand Up @@ -810,7 +828,8 @@ export async function splitAndExecuteTx({
? executeResult?.signature
: undefined
}] ` +
`${executionCounter}/${transactions.length} (${transaction.instructions.length} instructions) executed`
`${executionCounter}(${transaction.instructions.length} ixes)/${transactions.length} ` +
(simulate ? 'simulated' : 'executed')
)

if (executeResult !== undefined) {
Expand All @@ -827,7 +846,7 @@ export async function splitAndExecuteTx({
return result
}

async function generateNewTransaction({
function generateNewTransaction({
feePayer,
bh,
computeUnitLimit,
Expand All @@ -840,8 +859,8 @@ async function generateNewTransaction({
}>
computeUnitLimit?: number
computeUnitPrice?: number
}): Promise<Transaction> {
const transaction = await getTransaction(feePayer, bh)
}): Transaction {
const transaction = getNewTransaction(feePayer, bh)
addComputeBudgetIxes({
transaction,
computeUnitLimit,
Expand Down
Loading

0 comments on commit adb02d3

Please sign in to comment.