Skip to content

Commit

Permalink
Create a shim for get_filtered_indexed_accounts that sends the work…
Browse files Browse the repository at this point in the history
… to the background thread internally
  • Loading branch information
steveluscher committed Dec 18, 2024
1 parent 14440fa commit aa83125
Showing 1 changed file with 73 additions and 65 deletions.
138 changes: 73 additions & 65 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use {
self, AddressLoader, MessageHash, SanitizedTransaction, TransactionError,
VersionedTransaction, MAX_TX_ACCOUNT_LOCKS,
},
transaction_context::TransactionAccount,
},
solana_send_transaction_service::send_transaction_service::TransactionInfo,
solana_stake_program,
Expand Down Expand Up @@ -293,6 +294,40 @@ impl JsonRpcRequestProcessor {
.expect("Failed to spawn blocking task")
}

pub async fn get_filtered_indexed_accounts(
&self,
bank: &Arc<Bank>,
index_key: &IndexKey,
program_id: &Pubkey,
filters: Vec<RpcFilterType>,
sort_results: bool,
) -> ScanResult<Vec<TransactionAccount>> {
let bank = Arc::clone(bank);
let index_key = index_key.to_owned();
let program_id = program_id.to_owned();
self.runtime
.spawn_blocking(move || {
bank.get_filtered_indexed_accounts(
&index_key,
|account| {
// The program-id account index checks for Account owner on inclusion.
// However, due to the current AccountsDb implementation, an account may
// remain in storage as a zero-lamport AccountSharedData::Default() after
// being wiped and reinitialized in later updates. We include the redundant
// filters here to avoid returning these accounts.
account.owner().eq(&program_id)
&& filters
.iter()
.all(|filter_type| filter_allows(filter_type, account))
},
&ScanConfig::new(!sort_results),
bank.byte_limit_for_scans(),
)
})
.await
.expect("Failed to spawn blocking task")
}

#[allow(deprecated)]
fn bank(&self, commitment: Option<CommitmentConfig>) -> Arc<Bank> {
debug!("RPC commitment_config: {:?}", commitment);
Expand Down Expand Up @@ -2188,11 +2223,6 @@ impl JsonRpcRequestProcessor {
sort_results: bool,
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
optimize_filters(&mut filters);
let filter_closure = move |account: &AccountSharedData| {
filters
.iter()
.all(|filter_type| filter_allows(filter_type, account))
};
if self
.config
.account_indexes
Expand All @@ -2203,34 +2233,28 @@ impl JsonRpcRequestProcessor {
index_key: program_id.to_string(),
});
}
self.runtime
.spawn_blocking(move || {
bank.get_filtered_indexed_accounts(
&IndexKey::ProgramId(program_id),
|account| {
// The program-id account index checks for Account owner on inclusion. However, due
// to the current AccountsDb implementation, an account may remain in storage as a
// zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
// updates. We include the redundant filters here to avoid returning these
// accounts.
account.owner() == &program_id && filter_closure(account)
},
&ScanConfig::new(!sort_results),
bank.byte_limit_for_scans(),
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})
})
.await
.expect("Failed to spawn blocking task")
self.get_filtered_indexed_accounts(
&bank,
&IndexKey::ProgramId(program_id),
&program_id,
filters,
sort_results,
)
.await
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})
} else {
// this path does not need to provide a mb limit because we only want to support secondary indexes
self.runtime
.spawn_blocking(move || {
bank.get_filtered_program_accounts(
&program_id,
filter_closure,
|account: &AccountSharedData| {
filters
.iter()
.all(|filter_type| filter_allows(filter_type, account))
},
&ScanConfig::new(!sort_results),
)
.map_err(|e| RpcCustomError::ScanError {
Expand Down Expand Up @@ -2274,25 +2298,17 @@ impl JsonRpcRequestProcessor {
index_key: owner_key.to_string(),
});
}
self.runtime
.spawn_blocking(move || {
bank.get_filtered_indexed_accounts(
&IndexKey::SplTokenOwner(owner_key),
|account| {
account.owner() == &program_id
&& filters
.iter()
.all(|filter_type| filter_allows(filter_type, account))
},
&ScanConfig::new(!sort_results),
bank.byte_limit_for_scans(),
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})
})
.await
.expect("rpc: get_filtered_indexed_account panicked")
self.get_filtered_indexed_accounts(
&bank,
&IndexKey::SplTokenOwner(owner_key),
&program_id,
filters,
sort_results,
)
.await
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})
} else {
self.get_filtered_program_accounts(bank, program_id, filters, sort_results)
.await
Expand Down Expand Up @@ -2330,25 +2346,17 @@ impl JsonRpcRequestProcessor {
index_key: mint_key.to_string(),
});
}
self.runtime
.spawn_blocking(move || {
bank.get_filtered_indexed_accounts(
&IndexKey::SplTokenMint(mint_key),
|account| {
account.owner() == &program_id
&& filters
.iter()
.all(|filter_type| filter_allows(filter_type, account))
},
&ScanConfig::new(!sort_results),
bank.byte_limit_for_scans(),
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})
})
.await
.expect("rpc: get_filtered_indexed_account panicked")
self.get_filtered_indexed_accounts(
&bank,
&IndexKey::SplTokenMint(mint_key),
&program_id,
filters,
sort_results,
)
.await
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})
} else {
self.get_filtered_program_accounts(bank, program_id, filters, sort_results)
.await
Expand Down

0 comments on commit aa83125

Please sign in to comment.